我有一个将文件作为输入并给出输出文件的外部程序

     //for example  
     input file: IN_FILE 
     output file: OUT_FILE 
 
    //Run External program  
     ./vx < ${IN_FILE} > ${OUT_FILE} 

我想要HDFS中的输入和输出文件

我有 8 个节点的集群。我有 8 个输入文件,每个文件有 1 行

    //1 input file :       1.txt  
           1:0,0,0 
    //2 input file :       2.txt  
           2:0,0,128 
    //3 input file :       3.txt  
           3:0,128,0 
    //5 input file :       4.txt  
           4:0,128,128 
    //5 input file :       5.txt  
           5:128,0,0 
    //6 input file :       6.txt  
           6:128,0,128 
    //7 input file :       7.txt  
           7:128,128,0 
    //8 input file :       8.txt  
           8:128,128,128 

我正在使用 KeyValueTextInputFormat

               key :file name 
               value: initial coordinates 

例如第5个文件

              key :5 
              value:128,0,0 

每个 map task 根据其初始坐标生成大量数据。

现在我想在每个 map task 中运行外部程序并生成输出文件。

但我对如何处理 HDFS 中的文件感到困惑。

         I can use zero reducer and create file in HDFS  
 
         Configuration conf = new Configuration(); 
         FileSystem fs = FileSystem.get(conf); 
         Path outFile; 
         outFile = new Path(INPUT_FILE_NAME); 
         FSDataOutputStream out = fs.create(outFile); 
 
         //generating data ........ and writing to HDFS  
          out.writeUTF(lon + ";" + lat + ";" + depth + ";"); 

我很困惑如何在不将文件放入本地目录的情况下使用 HDFS 文件运行外部程序。

  with  dfs -get  

在不使用 MR 的情况下,我使用 shell 脚本得到如下结果

#!/bin/bash 
 
if [ $# -lt 2 ]; then 
    printf "Usage: %s: <infile> <outfile> \n" $(basename $0) >&2 
          exit 1 
fi 
 
IN_FILE=/Users/x34/data/$1 
OUT_FILE=/Users/x34/data/$2                      
 
cd "/Users/x34/Projects/externalprogram/model/" 
 
./vx < ${IN_FILE} > ${OUT_FILE} 
 
paste ${IN_FILE} ${OUT_FILE} | awk '{print $1,"\t",$2,"\t",$3,"\t",$4,"\t",$5,"\t",$22,"\t",$23,"\t",$24}' > /Users/x34/data/combined 
if [ $? -ne 0 ]; then 
    exit 1 
fi                       
 
exit 0 

然后我运行它

         ProcessBuilder pb = new ProcessBuilder("SHELL_SCRIPT","in", "out");  
         Process p = pb.start(); 

如果知道如何使用 hadoop 流或任何其他方式运行外部程序,我将不胜感激。我希望 HDFS 中的 INPUT 和 OUTPUT 文件进行进一步处理。

请帮忙

请您参考如下方法:

因此假设您的外部程序不知道如何识别或读取 hdfs,那么您要做的就是从 java 加载文件并将其作为输入直接传递给程序

Path path = new Path("hdfs/path/to/input/file"); 
FileSystem fs = FileSystem.get(configuration); 
FSDataInputStream fin = fs.open(path); 
ProcessBuilder pb = new ProcessBuilder("SHELL_SCRIPT"); 
Process p = pb.start(); 
OutputStream os = p.getOutputStream(); 
BufferedReader br = new BufferedReader(new InputStreamReader(fin)); 
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(os)); 
 
String line = null; 
while ((line = br.readLine())!=null){ 
    writer.write(line); 
} 

可以反向进行输出。从进程中获取 InputStream,并制作一个 FSDataOutputStream 以写入 hdfs。

从本质上讲,您的程序与这两个东西一起成为一个适配器,将 HDFS 转换为输入并输出回 HDFS。


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!