Hadoop初探(持续更新~)

始于2020 05~

Posted by Tao on May 26, 2020

最近开始接触分布式系统,使用和本地文件系统有很大不同,记录一些使用方法以备后续使用~

总体介绍

安装

暂时没有多机实验,先跳过~

HDFS

HDFS文件系统本身为了大数据的分布式处理,所以设计理念是一次写大文件,多次读取。本身不太适合用于小文件。

  • 类似于shell的方式
  • tensorflow自带(底层是用了c的接口,可以实现类似于本地文件的使用效果,比如随机读写)
  • java里提供的读写

shell-like

参考链接

hadoop fs -ls(r) hdfs://
hadoop fs -get/put hdfs://
hadoop fs -du (-h) hdfs://
hadoop fs -rm (-r) hdfs://
hadoop fs -touchz hdfs:// # create empty file
hadoop fs -text hdfs:// # dispaly a file including zip and TextRecordInputStream 

分布式处理程序demo

统计词数

# upload file input.txt
hadoop fs -put input.txt ${tao}/input.txt

hadoop fs -test -e "${tao}/hadoop_test/output" && hadoop fs -rm -r "${tao}/hadoop_test/output"
hadoop jar /opt/tiger/yarn_deploy/hadoop-2.6.0-cdh5.4.4/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
-D mapreduce.job.name=cat2wc_xx \
-D yarn.cluster.name='xx' \
-D mapreduce.job.queuename='xx' \
-input ${tao}/hadoop_test/input.txt \
-output ${tao}/hadoop_test/output \
-mapper /bin/cat \
-reducer /usr/bin/wc

数字统计

一些配置可以通过-D来指定。

# 删除输出路径
hadoop fs -test -e "${output}" && hadoop fs -rm -r "${output}"

hadoop jar "${hadoopJar}" \
-D mapred.task.timeout=36000000 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options='-k1,1' \
-D mapred.text.key.comparator.options='-k1,1n' \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.reduce.tasks=3 \
-D mapred.compress.map.output=true\
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-D mapred.job.queuename="xxx" \
-jobconf mapred.job.name="number count" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-mapper "python count_map.py"  \
-reducer "python count_reduce.py" \
-file "count_map.py" \
-file "count_reduce.py" \
-input "${input}" \
-output "${output}"
# count_map.py
#!/usr/bin/env python
#coding:utf-8
import re
import sys

def funcMap(fd) :

    line = fd.readline()
    while line :
        num = line.strip()
        if num.isdigit() : #非半角空白字符
            sys.stdout.write("{}\t{}\n".format(num, 1))
        line = fd.readline()

if __name__ == "__main__" :
    funcMap(sys.stdin)
# count_reduce.py
#!/usr/bin/env python
#coding:utf-8
import re
import sys

def funcReduce(fd) :

    numCnt = 0
    preNum = ""

    line = fd.readline()
    while line :
        line = line.strip()
        items = re.split("\s+", line)
        curNum, curCnt = items[0], int(items[1])
        if curNum == preNum :
            numCnt += curCnt
        else :
            if preNum != "" :
                sys.stdout.write("{}\t{}\n".format(preNum, numCnt))
            preNum = curNum
            numCnt = curCnt

        line = fd.readline()

    if preNum != "" :
        sys.stdout.write("{}\t{}\n".format(preNum, numCnt))

if __name__ == "__main__" :

    funcReduce(sys.stdin)

高级用法

打包程序

conda新建环境,安装好依赖,本地用sort加上pipeline调试,确认无误后将环境的文件夹用tar打包为tgz格式文件。注意打包后里面的路径没有问题。

中文分词

#Only the code base is uploaded from local.
CODE_DIR=`pwd`

CODEBASE_NAME=`basename ${CODE_DIR}`
ENV=hdfs://xxx/hadoop-usage/jieba_split/fenci.tgz

hadoop jar xx/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
-D mapreduce.job.name=fenci_xx \
-D yarn.cluster.name=xx \
-D mapreduce.job.queuename=xxx \
-D mapreduce.job.reduces=0 \
-files ../${CODEBASE_NAME} \
-archives ${ENV} \
-cmdenv LD_LIBRARY_PATH=fenci.tgz/lib:$LD_LIBRARY_PATH \
-cmdenv PYTHONPATH=${CODEBASE_NAME}:$PYTHONPATH \
-input ${tao}/hadoop_test/zh.txt \
-output ${tao}/hadoop_test/output \
-mapper "fenci.tgz/bin/python ${CODEBASE_NAME}/mapper.py"
import sys
import jieba


for line in sys.stdin:
    line = ' '.join(jieba.cut(line.strip()))
    sys.stdout.write(f'{line}\n')

使用技巧心得