ç¨æ·é 置并å°ä¸ä¸ªHadoopä½ä¸æå°Hadoopæ¡æ¶ä¸ï¼Hadoopæ¡æ¶ä¼æè¿ä¸ªä½ä¸å解æä¸ç³»åmap tasks åreduce tasksãHadoopæ¡æ¶è´è´£taskåååæ§è¡ï¼ç»ææ¶éåä½ä¸è¿åº¦çæ§ã
ä¸å¾ç»åºäºä¸ä¸ªä½ä¸ä»å¼å§æ§è¡å°ç»ææç»åçé¶æ®µåæ¯ä¸ªé¶æ®µè¢«è°æ§å¶ï¼ç¨æ· or Hadoopæ¡æ¶ï¼ã
ä¸å¾è¯¦ç»ç»åºäºç¨æ·ç¼åMapRedueä½ä¸æ¶éè¦è¿è¡é£äºå·¥ä½ä»¥åHadoopæ¡æ¶èªå¨å®æçå·¥ä½ï¼
å¨ç¼åMapReduceç¨åºæ¶ï¼ç¨æ·åå«éè¿InputFormatåOutputFormatæå®è¾å ¥åè¾åºæ ¼å¼ï¼å¹¶å®ä¹MapperåReduceræå®mapé¶æ®µåreduceé¶æ®µçè¦åçå·¥ä½ãå¨Mapperæè Reducerä¸ï¼ç¨æ·åªéæå®ä¸å¯¹key/valueçå¤çé»è¾ï¼Hadoopæ¡æ¶ä¼èªå¨é¡ºåºè¿ä»£è§£ææækey/valueï¼å¹¶å°æ¯å¯¹key/value交ç»Mapperæè Reducerå¤çã表é¢ä¸çæ¥ï¼Hadoopéå®æ°æ®æ ¼å¼å¿ 须为key/valueå½¢å¼ï¼è¿äºç®åï¼å¾é¾è§£å³å¤æé®é¢ï¼å®é ä¸ï¼å¯ä»¥éè¿ç»åçæ¹æ³ä½¿keyæè valueï¼æ¯å¦å¨keyæè valueä¸ä¿åå¤ä¸ªå段ï¼æ¯ä¸ªå段ç¨åé符åå¼ï¼æè valueæ¯ä¸ªåºåååç对象ï¼å¨Mapperä¸ä½¿ç¨æ¶ï¼å°å ¶ååºååçï¼ä¿åå¤éä¿¡æ¯ï¼ä»¥è§£å³è¾å ¥æ ¼å¼è¾å¤æçåºç¨ã
2.2 ç¨æ·çå·¥ä½
ç¨æ·ç¼åMapReduceéè¦å®ç°çç±»æè æ¹æ³æï¼
ï¼1ï¼ InputFormatæ¥å£
ç¨æ·éè¦å®ç°è¯¥æ¥å£ä»¥æå®è¾å ¥æ件çå å®¹æ ¼å¼ã该æ¥å£æ两个æ¹æ³
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
å ¶ä¸getSplitså½æ°å°ææè¾å ¥æ°æ®åænumSplits个splitï¼æ¯ä¸ªsplit交ç»ä¸ä¸ªmap taskå¤çãgetRecordReaderå½æ°æä¾ä¸ä¸ªç¨æ·è§£æsplitçè¿ä»£å¨å¯¹è±¡ï¼å®å°splitä¸çæ¯ä¸ªrecord解æækey/value对ã
Hadoopæ¬èº«æä¾äºä¸äºInputFormatï¼
ï¼2ï¼Mapperæ¥å£
ç¨æ·é继æ¿Mapperæ¥å£å®ç°èªå·±çMapperï¼Mapperä¸å¿ é¡»å®ç°çå½æ°æ¯
void map(K1 key,
V1 value,
OutputCollector<K2,V2> output,
Reporter reporter
) throws IOException
å ¶ä¸ï¼<K1 V1>æ¯éè¿Inputformatä¸çRecordReader对象解æå¤ç çï¼OutputCollectorè·åmap()çè¾åºç»æï¼Reporterä¿åäºå½åtaskå¤çè¿åº¦ã
Hadoopæ¬èº«æä¾äºä¸äºMapperä¾ç¨æ·ä½¿ç¨ï¼
ï¼3ï¼Partitioneræ¥å£
ç¨æ·é继æ¿è¯¥æ¥å£å®ç°èªå·±çPartitioner以æå®map task产ççkey/value对交ç»åªä¸ªreduce taskå¤çï¼å¥½çPartitionerè½è®©æ¯ä¸ªreduce taskå¤ççæ°æ®ç¸è¿ï¼ä»èè¾¾å°è´è½½åè¡¡ãPartitionerä¸éå®ç°çå½æ°æ¯
getPartition( K2 key, V2 value, int numPartitions)
该å½æ°è¿å<K2 V2>对åºçreduce task IDã
ç¨æ·å¦æä¸æä¾Partitionerï¼Hadoopä¼ä½¿ç¨é»è®¤çï¼å®é ä¸æ¯ä¸ªhashå½æ°ï¼ã
ï¼4ï¼Combiner
Combiner使å¾map taskä¸reduce taskä¹é´çæ°æ®ä¼ è¾é大大åå°ï¼å¯ææ¾æé«æ§è½ã大å¤æ°æ åµä¸ï¼Combinerä¸Reducerç¸åã
ï¼5ï¼Reduceræ¥å£
ç¨æ·é继æ¿Reduceræ¥å£å®ç°èªå·±çReducerï¼Reducerä¸å¿ é¡»å®ç°çå½æ°æ¯
void reduce(K2 key,
Iterator<V2> values,
OutputCollector<K3,V3> output,
Reporter reporter
) throws IOException
Hadoopæ¬èº«æä¾äºä¸äºReducerä¾ç¨æ·ä½¿ç¨ï¼
ï¼6ï¼OutputFormat
ç¨æ·éè¿OutputFormatæå®è¾åºæ件çå å®¹æ ¼å¼ï¼ä¸è¿å®æ²¡æsplitãæ¯ä¸ªreduce taskå°å ¶æ°æ®åå ¥èªå·±çæ件ï¼æ件å为part-nnnnnï¼å ¶ä¸nnnnn为reduce taskçIDã
3. åå¸å¼ç¼å
Haoopä¸èªå¸¦äºä¸ä¸ªåå¸å¼ç¼åï¼å³DistributedCache对象ï¼æ¹ä¾¿map taskä¹é´æè reduce taskä¹é´å ±äº«ä¸äºä¿¡æ¯ï¼æ¯å¦æäºå®é åºç¨ä¸ï¼ææmap taskè¦è¯»ååä¸ä¸ªé ç½®æ件æè åå ¸ï¼åå¯å°è¯¥é ç½®æ件æè åå ¸æ¾å°åå¸å¼ç¼åä¸ã
4. å¤è¯è¨ç¼åMapReduceä½ä¸
Hadoopéç¨javaç¼åï¼å èHadoop天çæ¯æjavaè¯è¨ç¼åä½ä¸ï¼ä½å¨å®é åºç¨ä¸ï¼ææ¶åï¼å è¦ç¨å°éjavaç第ä¸æ¹åºæè å ¶ä»åå ï¼è¦éç¨C/C++æè å ¶ä»è¯è¨ç¼åMapReduceä½ä¸ï¼è¿æ¶åå¯è½è¦ç¨å°Hadoopæä¾çä¸äºå·¥å ·ã
å¦æä½ è¦ç¨C/C++ç¼åMpaReduceä½ä¸ï¼å¯ä½¿ç¨çå·¥å ·æHadoop Streamingæè Hadoop Pipesã
å¦æä½ è¦ç¨Pythonç¼åMapReduceä½ä¸ï¼å¯ä»¥ä½¿ç¨Hadoop Streamingæè Pydoopã
å¦æä½ è¦ä½¿ç¨å ¶ä»è¯è¨ï¼å¦shellï¼phpï¼rubyçï¼å¯ä½¿ç¨Hadoop Streamingã
å ³äºHadoop Streamingç¼ç¨ï¼å¯åè§æçè¿ç¯åæï¼ãHadoop Streamingç¼ç¨ã