Tuesday 8 July 2014

Generating ORC files using MapReduce

The ORC File format was introduced in Hive 0.11. It offers excellent compression ratios through the use of Run length encoding. Data stored in ORC format can be read through HCatalog so any Pig or MapReduce program can work with ORC format seamlessly.

Now we will see how to output ORC Files from a MapReduce program.

The ORCMapper class is as shown below:

ORCMapper.java
1:  public class ORCMapper extends Mapper<LongWritable,Text,NullWritable,Writable> {  
2:    
3:    private final OrcSerde serde = new OrcSerde();  
4:    
5:    //Define the struct which will represent each row in the ORC file  
6:    private final String typeString = "struct<air_temp:double,station_id:string,lat:double,lon:double>";  
7:    
8:    private final TypeInfo typeInfo = TypeInfoUtils  
9:        .getTypeInfoFromTypeString(typeString);  
10:    private final ObjectInspector oip = TypeInfoUtils  
11:        .getStandardJavaObjectInspectorFromTypeInfo(typeInfo);  
12:    
13:    private final NCDCParser parser = new NCDCParser();  
14:    
15:    
16:    @Override  
17:    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
18:    
19:        parser.parse(value.toString());  
20:    
21:        if(parser.isValid()) {  
22:    
23:          List<Object> struct =new ArrayList<Object>(4);  
24:          struct.add(0, parser.getAirTemp());  
25:          struct.add(1, parser.getStationId());  
26:          struct.add(2, parser.getLatitude());  
27:          struct.add(3, parser.getLongitude());  
28:    
29:          Writable row = serde.serialize(struct, oip);  
30:    
31:          context.write(NullWritable.get(), row);  
32:        }  
33:    }  
34:  }  
35:    

Each row in a ORC file is a struct, so we define a struct (line no: 6). The type info of our struct is used by the OrcSerde to serialize our data into the ORC format.

Now onto the driver class.

RunORC.java
1:  public class RunORC extends Configured implements Tool {  
2:    
3:    
4:    public static void main(String[] args) throws Exception {  
5:    
6:      int res = ToolRunner.run(new Configuration(), new RunORC(), args);  
7:      System.exit(res);  
8:    
9:    }  
10:    
11:    public int run(String[] arg) throws Exception {  
12:      Configuration conf=getConf();  
13:    
14:      //Set ORC configuration parameters  
15:      conf.set("orc.create.index", "true");  
16:    
17:    
18:      Job job = Job.getInstance(conf);  
19:      job.setJarByClass(RunORC.class);  
20:      job.setJobName("ORC Output");  
21:      job.setMapOutputKeyClass(NullWritable.class);  
22:      job.setMapOutputValueClass(Writable.class);  
23:    
24:    
25:      job.setMapperClass(ORCMapper.class);  
26:    
27:      job.setNumReduceTasks(Integer.parseInt(arg[2]));  
28:      job.setOutputFormatClass(OrcNewOutputFormat.class);  
29:    
30:      FileInputFormat.addInputPath(job, new Path(arg[0]));  
31:      Path output=new Path(arg[1]);  
32:    
33:      OrcNewOutputFormat.setCompressOutput(job,true);  
34:      OrcNewOutputFormat.setOutputPath(job,output);  
35:    
36:      return job.waitForCompletion(true) ? 0: 1;  
37:    }  
38:    
39:    
40:  }  
41:    

We have specified the OutputFormat as OrcNewOutputFormat.class (line no: 28).
We can also set various ORC configuration parameters (line no: 15).

Dependencies
For running this program add the following dependencies in your pom.xml
1:   <dependencies>  
2:      <dependency>  
3:        <groupId>org.apache.hadoop</groupId>  
4:        <artifactId>hadoop-client</artifactId>  
5:        <version>2.2.0</version>  
6:      </dependency>  
7:    
8:      <dependency>  
9:        <groupId>org.apache.hive</groupId>  
10:        <artifactId>hive-exec</artifactId>  
11:        <version>0.13.1</version>  
12:      </dependency>  
13:    </dependencies>  

Creating table in Hive
Once you run the program, you can create a hive table as follows:

  create external table ncdc (temp double, station_id string, lat double, lon double) stored as orc location "/weatherout";  

The source for this can be downloaded from:
https://github.com/gautamphegde/HadoopCraft

Note: This code has been tested on Hortonworks Data platform 2.0

19 comments:

  1. I tried do the same on a different usecase but go the following error : "Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Writable, received org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow". Any clue why this might be happening. Thanks in advance.

    ReplyDelete
    Replies
    1. Hey anon,
      OrcSerdeRow implements Writable, can you show me the snippet which is throwing this error?

      Delete
    2. Just changed the mapper to output constant values and it doesnt seem to work.
      Any clue why this is happening?

      public class ORCMapper extends Mapper {

      private final OrcSerde serde = new OrcSerde();

      //Define the struct which will represent each row in the ORC file
      private final String typeString = "struct<air_temp:string,station_id:string,lat:string,lon:string>";

      private final TypeInfo typeInfo = TypeInfoUtils
      .getTypeInfoFromTypeString(typeString);
      private final ObjectInspector oip = TypeInfoUtils
      .getStandardJavaObjectInspectorFromTypeInfo(typeInfo);


      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


      List<Object> struct =new ArrayList<Object>(4);
      struct.add(0, "a");
      struct.add(1, "b");
      struct.add(2, "c");
      struct.add(3, "d");

      Writable row = serde.serialize(struct, oip);

      context.write(NullWritable.get(), row);
      }

      }

      Delete
    3. I tried the same thing out and it worked fine for me. Is it giving any exception for you?

      Delete
  2. I tested your program without changes on the NRDC sample dataset and go the same error. You might have posted a version of the code which had bugs.

    ReplyDelete
    Replies
    1. I have tested the code with the NCDC weather data before commiting it into github. I am fairly confident that it works.

      I downloaded the dataset from the link below
      ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2009/

      Delete
  3. I got the same error as before : Type mismatch in value from map: expected org.apache.hadoop.io.Writable, received org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow" .
    .

    ReplyDelete
  4. Hey. I found the mistake. I didn't set my number of reducers to zero. Sorry for the trouble. Thanks for a great article.

    ReplyDelete
  5. Hi, I followed your example and it worked great but what I really want to is to read in an Orc file and convert it to text. I have tried using just serde.deserialize(values) but it doesn't work. I read you need to use the serde.getObjectInspector() but I don't know how to tie the two together to get a text file output. Any advice would be much appreciated!

    ReplyDelete
  6. Hi, When I run the job, I got the exception

    Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://service-10-0.local:8020/user/ppp/tmp/weatherout already exists

    I created table at the location /user/ppp/tmp/weatherout

    How can I fix this?

    ReplyDelete
  7. Hi, I managed to run without errors, but there is no file result generated in the output path only _SUCCESS. I add System.out.println in the mapper to debug and it really emit row data. Any ideas?

    ReplyDelete
  8. We are facing the exact issue as mentioned above, If the output path mentioned is /user/xyz/data/hive/testData ; the directory just contains the _SUCCESS file and the actual data is written under /usr/xyz/part-00000

    Did anyone else face the same issue?, - Thanks

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
    2. I am facing the same issue now, I was searching it and I found your comment here. Have you fixed it? If so, please let me know. It may help me.

      Delete
  9. ORC files have configuration like stripe size and compression buffer size. Plus it is often desirable to sort within each stripe based on some columns. How to improve your code to handle these extra stuff?

    ReplyDelete
    Replies
    1. Stripe size etc can be set thru the configuration, https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-ORCFileFormat

      As for sort within stripe, guess you would have to do secondary sort so that the records are in the sort order that you want, while inserting into the ORC File

      Delete
  10. Could you post, how to query a ORC Partition table insertion from Hive managed table with partitions. I am failing to do so. And even after inserting the new partitions for the HIVE ORC table, i could not query it using SELECT and WHERE …!
    It should be in HIVE query...

    ReplyDelete
  11. I have copy pasted the same code with some changes and i am getting NoClassDefFoundError. Please suggest me, what is the reason.

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat
    at com.hpi.css.telephony.validation.ORCTest.run(ORCTest.java:44)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at com.hpi.css.telephony.validation.ORCTest.main(ORCTest.java:22)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 9 more

    ReplyDelete
    Replies
    1. That means that OrcNewOutputFormat is not on the classpath. Make sure that OrcNewOutputFormat is on the classpath.

      One way to do this is to create a uberjar. If you check my example I am creating a uberjar using maven assembly with all dependencies. This makes sure that OrcNewOutputFormat is on the classpath.

      Delete