이 글에서는 항공 데이터를 이용하여 2005~2008년 사이에 항공사별 운행 지연 횟수와 지연 시간의 평균을 하둡을 이용하여 구해보도록 하겠습니다.


먼저 하둡에서 실행할 jar파일을 만들기 위해 이클립스를 실행합니다.


파싱 클래스 생성

하둡으로 입력한 csv의 내용을 맵퍼(mapper)에서 파싱할 클래스를 만듭니다. 이 예제에서는 입력받은 csv파일을 한 줄씩 받아서 항공사 코드 값(9번째 열), 지연 시간(25번째 열)만 전달받으면 되기 때문에 이 과정을 도와줄 클래스를 따로 만들었습니다.

코드 보기
package com.jin.Ex10;

import org.apache.hadoop.io.Text;

public class AirlineParser {
	private String UniqueCarrier;
	private int CarrierDelay;
	
	final static int SUSPENSIONSOFAIRLINE = -1;
	
	private int getDigitFromStr(String data, int def) {
		if("NA".equals(data)) return def;
		return Integer.parseInt(data);
	}

	public AirlineParser(Text txt) {
		String [] airdata = txt.toString().split(",");
		UniqueCarrier = airdata[8];
		
		CarrierDelay = getDigitFromStr(airdata[24], SUSPENSIONSOFAIRLINE);
	}

	public Text getCarrier() {
		return new Text(UniqueCarrier);
	}

	public int getDelaytime() {
		return CarrierDelay;
	}
	
	
}


mapper 생성

위에서 만든 클래스를 활용하여 맵퍼를 구현합니다. 지연 시간이 0보다 큰 값들만 리듀서에 전달합니다.

코드 보기
package com.jin.Ex10;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;


public class Map extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, IntWritable>{
	@Override
	public void map(LongWritable key, Text value, 
			OutputCollector<Text, IntWritable> output, 
			Reporter reporter)
			throws IOException {
		AirlineParser parser = new AirlineParser(value);
		
		if(0 < parser.getDelaytime()) {
			output.collect(new Text(parser.getCarrier())	, 
			       new IntWritable(parser.getDelaytime()));	

		}	
	}
}


reducer 생성

맵퍼에서 전달한 값을 처리하는 리듀서를 구현합니다.

getTotalCnt는 지연 횟수와 지연 시간의 합을 구해주는 역할을 합니다. 둘을 구한 후 "지연 시간의 합, 지연 횟수"을 리턴합니다. 리턴받은 값을 이용하여 지연 시간의 합을 횟수로 나누어 평균을 구한 후 최종 단계로 보냅니다.

image

코드 보기
package com.jin.Ex10;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class Reduce extends MapReduceBase implements 
Reducer<Text, IntWritable, Text, Text>{
	private String getTotalCnt(Iterator<IntWritable> values) {
		int cnt=0;
		int sum=0;
		
		while(values.hasNext()) {
			
			cnt ++; 
			sum = sum + values.next().get();
		}
		return sum + "," + cnt;
	}
	@Override
	public void reduce(Text key, Iterator<IntWritable> values, 
			OutputCollector<Text, Text> output, 
			Reporter reporter)
			throws IOException {
		String sumCnt = getTotalCnt(values);
		String [] data = sumCnt.split(",");
		int sum = Integer.parseInt(data[0]);
		int cnt = Integer.parseInt(data[1]);
		float avg = (float)sum/(float)cnt;

		
		output.collect(key, new Text(sum + "," + avg));
	}
}


메인 클래스 생성

맵퍼와 리듀서를 전달받아 전체 프로세스를 진행하는 메인 클래스를 구현합니다. 맵퍼에서 전달하는 형식<Text, IntWritable>과 리듀서에서 전달하는 형식<Text, Text>이 다르기 때문에 setMapOutputKeyClass, setMapOutputValueClass 를 반드시 입력해야합니다.

코드 보기
package com.jin.Ex10;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Value extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new Value(), args);
		System.exit(exitCode);
	}
	@Override
	public int run(String[] arg0) throws Exception {
		JobConf conf = new JobConf(Value.class);
		
		conf.setJobName("Value Test");
		
		conf.setMapOutputKeyClass(Text.class);
		conf.setMapOutputValueClass(IntWritable.class);
				
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
		
		conf.setMapperClass(Map.class);
		conf.setReducerClass(Reduce.class);
		
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		// /airdata
		FileInputFormat.setInputPaths(conf, new Path(arg0[0]));
		// /output
		FileOutputFormat.setOutputPath(conf, new Path(arg0[1]));
		
		JobClient.runJob(conf);
		return 0;
	}

}


여기까지 만든 후 jar형식으로 export하여 하둡으로 옮깁니다.


하둡에 파일 입력

input 폴더 만들기

hadoop fs -mkdir /input

input 폴더에 csv 입력

hadoop fs -put /usr/local/hadoop/data/* /input


하둡 실행 및 출력

하둡 맵리듀스 실행

hadoop jar Ex10.jar com.jin.Ex10.Value /input /output

맵리듀스 결과 확인(ls로 파일 이름 확인 필수)

hadoop fs -cat /output/part-00000

결과 파일을 원하는 폴더로 이동

hadoop fs -get /output/part-00000 /[원하는 위치]/[이름.확장자]