有人设法在 Windows 上使用 Spark 的 DataFrame 编写文件(尤其是 CSV)吗?

许多关于 SO 的答案已经过时(例如 this one ),因为自 2.0 版以来,Sparks 具有编写 .CSV 的 native 功能(以及统一的 write() 方法)。另外,我下载并添加了 winutils.exe 就像提议的 here 一样。

代码:

// reading works just fine 
val df = spark.read 
             .option("header", true) 
             .option("inferSchema", true) 
             .csv("file:///C:/tmp/in.csv") 
// writing fails, none of these work 
df.write.csv("file:///C:/tmp/out.csv") 
df.write.csv("C:/tmp/out.csv") 

错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted. 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) 
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:551) 
    at prost.ebtl.load.DataSourceCSV$.loadFromFilesystem(DataSourceCSV.scala:12) 
    at TestScala$$anonfun$main$2.apply(TestScala.scala:98) 
    at TestScala$$anonfun$main$2.apply(TestScala.scala:80) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
    at TestScala$.main(TestScala.scala:80) 
    at TestScala.main(TestScala.scala) 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 13, 192.168.56.1): java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor; 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method) 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209) 
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:305) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:294) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:326) 
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:393) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) 
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132) 
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVRelation.scala:191) 
    at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143) 
    ... 27 more 
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor; 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method) 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209) 
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:305) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:294) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:326) 
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:393) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) 
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132) 
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVRelation.scala:191) 
    at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

注意:尽管创建了一个名为 out.csv 的文件夹

设置:Hadoop v.2.7.3、Spark 2.0.1 Intelli J IDEA 2016.2、Scala 2.11.8、Win7 工作站上的 Testcluster

请您参考如下方法:

我试过这个,它的工作。您需要设置仓库目录配置。这是您的代码中唯一缺少的东西,您是否对您尝试写入的目录具有写入权限。

val spark = SparkSession 
    .builder() 
    .appName("Spark SQL CSV example") 
    .master("local") 
    .config("spark.sql.warehouse.dir", "file:///C:/IJava/") 
    .getOrCreate() 
 
  val df = spark.read 
    .option("header", true) 
    .option("inferSchema", true) 
    .csv("file:///C:/Users/sankar/Downloads/FLinsurancesample.csv") 
 
  df.write.csv("file:///C:/Users/sankar/Downloads/out.csv") 


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!