最近开始接触分布式系统,使用和本地文件系统有很大不同,记录一些使用方法以备后续使用~
总体介绍
安装
暂时没有多机实验,先跳过~
HDFS
HDFS文件系统本身为了大数据的分布式处理,所以设计理念是一次写大文件,多次读取。本身不太适合用于小文件。
- 类似于shell的方式
- tensorflow自带(底层是用了c的接口,可以实现类似于本地文件的使用效果,比如随机读写)
- java里提供的读写
shell-like
hadoop fs -ls(r) hdfs://
hadoop fs -get/put hdfs://
hadoop fs -du (-h) hdfs://
hadoop fs -rm (-r) hdfs://
hadoop fs -touchz hdfs:// # create empty file
hadoop fs -text hdfs:// # dispaly a file including zip and TextRecordInputStream
分布式处理程序demo
统计词数
# upload file input.txt
hadoop fs -put input.txt ${tao}/input.txt
hadoop fs -test -e "${tao}/hadoop_test/output" && hadoop fs -rm -r "${tao}/hadoop_test/output"
hadoop jar /opt/tiger/yarn_deploy/hadoop-2.6.0-cdh5.4.4/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
-D mapreduce.job.name=cat2wc_xx \
-D yarn.cluster.name='xx' \
-D mapreduce.job.queuename='xx' \
-input ${tao}/hadoop_test/input.txt \
-output ${tao}/hadoop_test/output \
-mapper /bin/cat \
-reducer /usr/bin/wc
数字统计
一些配置可以通过-D来指定。
# 删除输出路径
hadoop fs -test -e "${output}" && hadoop fs -rm -r "${output}"
hadoop jar "${hadoopJar}" \
-D mapred.task.timeout=36000000 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options='-k1,1' \
-D mapred.text.key.comparator.options='-k1,1n' \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.reduce.tasks=3 \
-D mapred.compress.map.output=true\
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-D mapred.job.queuename="xxx" \
-jobconf mapred.job.name="number count" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-mapper "python count_map.py" \
-reducer "python count_reduce.py" \
-file "count_map.py" \
-file "count_reduce.py" \
-input "${input}" \
-output "${output}"
# count_map.py
#!/usr/bin/env python
#coding:utf-8
import re
import sys
def funcMap(fd) :
line = fd.readline()
while line :
num = line.strip()
if num.isdigit() : #非半角空白字符
sys.stdout.write("{}\t{}\n".format(num, 1))
line = fd.readline()
if __name__ == "__main__" :
funcMap(sys.stdin)
# count_reduce.py
#!/usr/bin/env python
#coding:utf-8
import re
import sys
def funcReduce(fd) :
numCnt = 0
preNum = ""
line = fd.readline()
while line :
line = line.strip()
items = re.split("\s+", line)
curNum, curCnt = items[0], int(items[1])
if curNum == preNum :
numCnt += curCnt
else :
if preNum != "" :
sys.stdout.write("{}\t{}\n".format(preNum, numCnt))
preNum = curNum
numCnt = curCnt
line = fd.readline()
if preNum != "" :
sys.stdout.write("{}\t{}\n".format(preNum, numCnt))
if __name__ == "__main__" :
funcReduce(sys.stdin)
高级用法
打包程序
conda新建环境,安装好依赖,本地用sort加上pipeline调试,确认无误后将环境的文件夹用tar打包为tgz格式文件。注意打包后里面的路径没有问题。
中文分词
#Only the code base is uploaded from local.
CODE_DIR=`pwd`
CODEBASE_NAME=`basename ${CODE_DIR}`
ENV=hdfs://xxx/hadoop-usage/jieba_split/fenci.tgz
hadoop jar xx/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
-D mapreduce.job.name=fenci_xx \
-D yarn.cluster.name=xx \
-D mapreduce.job.queuename=xxx \
-D mapreduce.job.reduces=0 \
-files ../${CODEBASE_NAME} \
-archives ${ENV} \
-cmdenv LD_LIBRARY_PATH=fenci.tgz/lib:$LD_LIBRARY_PATH \
-cmdenv PYTHONPATH=${CODEBASE_NAME}:$PYTHONPATH \
-input ${tao}/hadoop_test/zh.txt \
-output ${tao}/hadoop_test/output \
-mapper "fenci.tgz/bin/python ${CODEBASE_NAME}/mapper.py"
import sys
import jieba
for line in sys.stdin:
line = ' '.join(jieba.cut(line.strip()))
sys.stdout.write(f'{line}\n')