开发者

hadoop-streaming : writing output to different files

开发者 https://www.devze.com 2023-04-12 13:42 出处:网络
Here is the scenario Reducer1 / Mapper - - Reducer2 \\ ReducerN In reducer I want to write the data on different files, lets say the reducer looks like

Here is the scenario

           Reducer1  
         /  
Mapper - - Reducer2  
         \   
           ReducerN  

In reducer I want to write the data on different files, lets say the reducer looks like

def reduce():  
  for line in sys.STDIN:  
    if(line == type1):
      create开发者_如何学JAVA_type_1_file(line)
    if(line == type2):
      create_type_2_file(line)
    if(line == type3):
      create_type3_file(line)
      ... and so on  
def create_type_1_file(line):
  # writes to file1  
def create_type2_file(line):
  # writes to file2  
def create_type_3_file(line):
  # write to file 3  

Consider the paths to write as :

file1 = /home/user/data/file1  
file2 = /home/user/data/file2  
file3 = /home/user/data/file3  

When I run in pseudo-distributed mode(machine with one node and hdfs daemons running), things are good since all daemons will write to the same set of files

Question: - If I run this in cluster of 1000 machines, will they write to the same set of files even then? I am writing to local filesystem in this case, Is there a better way to perform this operation in hadoop streaming?


Typically the o/p of reduce is written to a reliable storage system like HDFS, because if one of the nodes goes down then the reduce data associated with that nodes is lost. It's not possible to run that particular reduce task again outside the context of the Hadoop framework. Also, once the job is complete, the o/p from the 1000 nodes have to be consolidated for the different input types.

Concurrent writing is not supported in HDFS. There might be a case where multiple reducers might be writing to the same file in HDFS and this might corrupt the file. When multiple reduce tasks are running on a single node, concurrency might be a problem when writing to a single local file also.

One of the solution is to have a reduce task specific file name and later combine all the files for a specific input type.


Output can be written from the Reducer to more than one location using MultipleOutputs class.You can regard file1,file2 and file3 as three folders and write 1000 Reducers' output data to these folders seperately.


Usage pattern for job submission:

 Job job = new Job();

 FileInputFormat.setInputPath(job, inDir);

//outDir is the root path, in this case, outDir="/home/user/data/"
 FileOutputFormat.setOutputPath(job, outDir);

//You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 job.setMapperClass(MOMap.class);

 job.setReducerClass(MOReduce.class);

 ...

 job.waitForCompletion(true);

Usage in Reducer:

private MultipleOutputs out;

 public void setup(Context context) {

   out = new MultipleOutputs(context);

   ...

 }

 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary.
 for (Text line : values) {

    if(line == type1)
      out.write(key, new Text(line),"file1/part");

  else  if(line == type2)
      out.write(key, new Text(line),"file2/part");

 else   if(line == type3)
      out.write(key, new Text(line),"file3/part");
   }
 }

 protected void cleanup(Context context) throws IOException, InterruptedException {
       out.close();
   }

ref:https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号