Tuesday, 8 July 2014

Generating ORC files using MapReduce

The ORC File format was introduced in Hive 0.11. It offers excellent compression ratios through the use of Run length encoding. Data stored in ORC format can be read through HCatalog so any Pig or MapReduce program can work with ORC format seamlessly.

Now we will see how to output ORC Files from a MapReduce program.

The ORCMapper class is as shown below:

ORCMapper.java
1:  public class ORCMapper extends Mapper<LongWritable,Text,NullWritable,Writable> {  
2:    
3:    private final OrcSerde serde = new OrcSerde();  
4:    
5:    //Define the struct which will represent each row in the ORC file  
6:    private final String typeString = "struct<air_temp:double,station_id:string,lat:double,lon:double>";  
7:    
8:    private final TypeInfo typeInfo = TypeInfoUtils  
9:        .getTypeInfoFromTypeString(typeString);  
10:    private final ObjectInspector oip = TypeInfoUtils  
11:        .getStandardJavaObjectInspectorFromTypeInfo(typeInfo);  
12:    
13:    private final NCDCParser parser = new NCDCParser();  
14:    
15:    
16:    @Override  
17:    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
18:    
19:        parser.parse(value.toString());  
20:    
21:        if(parser.isValid()) {  
22:    
23:          List<Object> struct =new ArrayList<Object>(4);  
24:          struct.add(0, parser.getAirTemp());  
25:          struct.add(1, parser.getStationId());  
26:          struct.add(2, parser.getLatitude());  
27:          struct.add(3, parser.getLongitude());  
28:    
29:          Writable row = serde.serialize(struct, oip);  
30:    
31:          context.write(NullWritable.get(), row);  
32:        }  
33:    }  
34:  }  
35:    

Each row in a ORC file is a struct, so we define a struct (line no: 6). The type info of our struct is used by the OrcSerde to serialize our data into the ORC format.

Now onto the driver class.

RunORC.java
1:  public class RunORC extends Configured implements Tool {  
2:    
3:    
4:    public static void main(String[] args) throws Exception {  
5:    
6:      int res = ToolRunner.run(new Configuration(), new RunORC(), args);  
7:      System.exit(res);  
8:    
9:    }  
10:    
11:    public int run(String[] arg) throws Exception {  
12:      Configuration conf=getConf();  
13:    
14:      //Set ORC configuration parameters  
15:      conf.set("orc.create.index", "true");  
16:    
17:    
18:      Job job = Job.getInstance(conf);  
19:      job.setJarByClass(RunORC.class);  
20:      job.setJobName("ORC Output");  
21:      job.setMapOutputKeyClass(NullWritable.class);  
22:      job.setMapOutputValueClass(Writable.class);  
23:    
24:    
25:      job.setMapperClass(ORCMapper.class);  
26:    
27:      job.setNumReduceTasks(Integer.parseInt(arg[2]));  
28:      job.setOutputFormatClass(OrcNewOutputFormat.class);  
29:    
30:      FileInputFormat.addInputPath(job, new Path(arg[0]));  
31:      Path output=new Path(arg[1]);  
32:    
33:      OrcNewOutputFormat.setCompressOutput(job,true);  
34:      OrcNewOutputFormat.setOutputPath(job,output);  
35:    
36:      return job.waitForCompletion(true) ? 0: 1;  
37:    }  
38:    
39:    
40:  }  
41:    

We have specified the OutputFormat as OrcNewOutputFormat.class (line no: 28).
We can also set various ORC configuration parameters (line no: 15).

Dependencies
For running this program add the following dependencies in your pom.xml
1:   <dependencies>  
2:      <dependency>  
3:        <groupId>org.apache.hadoop</groupId>  
4:        <artifactId>hadoop-client</artifactId>  
5:        <version>2.2.0</version>  
6:      </dependency>  
7:    
8:      <dependency>  
9:        <groupId>org.apache.hive</groupId>  
10:        <artifactId>hive-exec</artifactId>  
11:        <version>0.13.1</version>  
12:      </dependency>  
13:    </dependencies>  

Creating table in Hive
Once you run the program, you can create a hive table as follows:

  create external table ncdc (temp double, station_id string, lat double, lon double) stored as orc location "/weatherout";  

The source for this can be downloaded from:
https://github.com/gautamphegde/HadoopCraft

Note: This code has been tested on Hortonworks Data platform 2.0