python - Apache-Spark load files from HDFS -


i've written simple python code sum.py load directory data hdfs, , add first-column numbers each csv file in directory data. code shows follow:

import os, sys, inspect, csv  ### current directory path. curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]  ### setup environment variables spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0"))) python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python"))) os.environ["spark_home"] = spark_home_dir os.environ["pythonpath"] = python_dir  ### setup pyspark directory path #pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../python"))) pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python"))) sys.path.append(pyspark_dir)  ### import pyspark pyspark import sparkconf, sparkcontext  ### myfunc add numbers in first column. def myfunc(s):   total = 0   if s.endswith(".csv"):     #s_new = os.path.realpath(os.path.abspath(os.path.join(data_path, s)))     #cr = csv.reader(open(s_new,"rb"))     cr = csv.reader(open(s,"rb"))     row in cr:       total += int(row[0])   print "the total number of ", s, " is: ", total   return total  def main():   ### initialize sparkconf , sparkcontext   conf = sparkconf().setappname("ruofan").setmaster("local")   sc = sparkcontext(conf = conf)    ### load data hdfs   datafile = sc.wholetextfiles("hdfs://localhost:9000/data")        ### sent application in each of slave node   temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:')))   if __name__ == "__main__":   main() 

before running code, put directory data local hdfs. typed following command:

$ hdfs dfs -ls /data 

output:

found 2 items -rw-r--r--   1 ying supergroup          6 2015-07-23 15:46 /data/test1.csv -rw-r--r--   1 ying supergroup          6 2015-07-23 15:46 /data/test2.csv 

so output shows data has been in hdfs. run code sum.py typing $ python sum.py. however, shows error:

ioerror: [errno 2] no such file or directory:'hdfs://localhost:9000/data/test1.csv'

the following traceback of code:

file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main     process()   file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process     serializer.dump_stream(func(split_index, iterator), outfile)   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func     return f(iterator)   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition     f(x)   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in <lambda>     temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:')))   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 26, in myfunc     cr = csv.reader(open(s,"rb")) ioerror: [errno 2] no such file or directory: 'hdfs://localhost:9000/data/test1.csv'      @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138)     @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179)     @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63)     @ org.apache.spark.scheduler.task.run(task.scala:70)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745) 15/07/23 18:01:49 warn tasksetmanager: lost task 0.0 in stage 0.0 (tid 0, localhost): org.apache.spark.api.python.pythonexception: traceback (most recent call last):   file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main     process()   file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process     serializer.dump_stream(func(split_index, iterator), outfile)   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func     return f(iterator)   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition     f(x)   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in <lambda>     temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:')))   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 26, in myfunc     cr = csv.reader(open(s,"rb")) ioerror: [errno 2] no such file or directory: 'hdfs://localhost:9000/data/test1.csv'      @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138)     @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179)     @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63)     @ org.apache.spark.scheduler.task.run(task.scala:70)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745)  15/07/23 18:01:49 error tasksetmanager: task 0 in stage 0.0 failed 1 times; aborting job 15/07/23 18:01:49 info taskschedulerimpl: removed taskset 0.0, tasks have completed, pool  15/07/23 18:01:49 info taskschedulerimpl: cancelling stage 0 15/07/23 18:01:49 info dagscheduler: resultstage 0 (foreach @ /home/ying/aws_tutorial/spark_codes/sum.py:41) failed in 0.392 s 15/07/23 18:01:49 info dagscheduler: job 0 failed: foreach @ /home/ying/aws_tutorial/spark_codes/sum.py:41, took 0.438280 s traceback (most recent call last):   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 45, in <module>     main()   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in main     temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:')))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 721, in foreach     self.mappartitions(processpartition).count()  # force evaluation   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 972, in count     return self.mappartitions(lambda i: [sum(1 _ in i)]).sum()   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 963, in sum     return self.mappartitions(lambda x: [sum(x)]).reduce(operator.add)   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 771, in reduce     vals = self.mappartitions(func).collect()   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 745, in collect     port = self.ctx._jvm.pythonrdd.collectandserve(self._jrdd.rdd())   file "/usr/local/lib/python2.7/dist-packages/py4j-0.8.2.1-py2.7.egg/py4j/java_gateway.py", line 538, in __call__     self.target_id, self.name)   file "/usr/local/lib/python2.7/dist-packages/py4j-0.8.2.1-py2.7.egg/py4j/protocol.py", line 300, in get_return_value     format(target_id, '.', name), value) py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.collectandserve. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 0.0 failed 1 times, recent failure: lost task 0.0 in stage 0.0 (tid 0, localhost): org.apache.spark.api.python.pythonexception: traceback (most recent call last):   file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main     process()   file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process     serializer.dump_stream(func(split_index, iterator), outfile)   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func     return func(split, prev_func(split, iterator))   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func     return f(iterator)   file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition     f(x)   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in <lambda>     temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:')))   file "/home/ying/aws_tutorial/spark_codes/sum.py", line 26, in myfunc     cr = csv.reader(open(s,"rb")) ioerror: [errno 2] no such file or directory: 'hdfs://localhost:9000/data/test1.csv'      @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138)     @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179)     @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63)     @ org.apache.spark.scheduler.task.run(task.scala:70)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745)  driver stacktrace:     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1266)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1257)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1256)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1256)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730)     @ scala.option.foreach(option.scala:236)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:730)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1450)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1411)     @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) 

i'm sure master name port localhost:9000, , have no idea error of code. appreciate if helps me fix problem.


Comments

Popular posts from this blog

python - pip install -U PySide error -

arrays - C++ error: a brace-enclosed initializer is not allowed here before ‘{’ token -

cytoscape.js - How to add nodes to Dagre layout with Cytoscape -