当前位置: 老葡京网站娱乐 > 数据库 > 其它数据库 > 正文

如何利用CombineFileInputFormat把netflix data set 导入到Hbase里

时间:2015-01-29 csdn博客 维尼弹着肖邦的夜曲
 老葡京网站娱乐 www.sdguanhua.com 
package com.mr.test;  
      
import java.io.IOException;  
      
import org.apache.hadoop.io.BytesWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;  
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
      
public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> {  
      
    @Override
    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  
      
        CombineFileSplit combineFileSplit = (CombineFileSplit) split;  
        CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);  
        try {  
            recordReader.initialize(combineFileSplit, context);  
        } catch (InterruptedException e) {  
            new RuntimeException("Error to initialize CombineSmallfileRecordReader.");  
        }  
        return recordReader;  
    }  
      
}
package com.mr.test;  
      
import java.io.IOException;  
      
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.BytesWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
      
public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> {  
      
    private CombineFileSplit combineFileSplit;  
    private LineRecordReader lineRecordReader = new LineRecordReader();  
    private Path[] paths;  
    private int totalLength;  
    private int currentIndex;  
    private float currentProgress = 0;  
    private LongWritable currentKey;  
    private BytesWritable currentValue = new BytesWritable();  
      
    public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {  
        super();  
        this.combineFileSplit = combineFileSplit;  
        this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引  
    }  
      
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {  
        this.combineFileSplit = (CombineFileSplit) split;  
        // 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据  
        FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());  
        lineRecordReader.initialize(fileSplit, context);  
      
        this.paths = combineFileSplit.getPaths();  
        totalLength = paths.length;  
        context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());  
    }  
      
    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {  
        currentKey = lineRecordReader.getCurrentKey();  
        return currentKey;  
    }  
      
<strong><span style="color:#ff0000;">   @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {  
        System.out.println("lineRecordReader:"+lineRecordReader.getCurrentValue().toString());  
        byte[] content = lineRecordReader.getCurrentValue().toString().getBytes();  
        System.out.println("content:"+new String(content));  
        currentValue = new BytesWritable();  
        currentValue.set(content, 0, content.length);  
        System.out.println("currentValue:"+new String(currentValue.getBytes()));  
        return currentValue;  
    }</span></strong>  
    public static void main(String args[]){  
        BytesWritable cv = new BytesWritable();  
        String str1 = "1234567";  
        String str2 = "123450";  
        cv.set(str1.getBytes(), 0, str1.getBytes().length);  
        System.out.println(new String(cv.getBytes()));  
              
        cv.setCapacity(0);  
              
        cv.set(str2.getBytes(), 0, str2.getBytes().length);  
        System.out.println(new String(cv.getBytes()));  
    }  
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {  
        if (currentIndex >= 0 && currentIndex < totalLength) {  
            return lineRecordReader.nextKeyValue();  
        } else {  
            return false;  
        }  
    }  
      
    @Override
    public float getProgress() throws IOException {  
        if (currentIndex >= 0 && currentIndex < totalLength) {  
            currentProgress = (float) currentIndex / totalLength;  
            return currentProgress;  
        }  
        return currentProgress;  
    }  
      
    @Override
    public void close() throws IOException {  
        lineRecordReader.close();  
    }  
}

更多精彩内容:http://www.sdguanhua.com/database/extra/