python - Apache-Spark load files from HDFS -
i've written simple python code sum.py
load directory data
hdfs, , add first-column numbers each csv file in directory data
. code shows follow:
import os, sys, inspect, csv ### current directory path. curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0] ### setup environment variables spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0"))) python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python"))) os.environ["spark_home"] = spark_home_dir os.environ["pythonpath"] = python_dir ### setup pyspark directory path #pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../python"))) pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python"))) sys.path.append(pyspark_dir) ### import pyspark pyspark import sparkconf, sparkcontext ### myfunc add numbers in first column. def myfunc(s): total = 0 if s.endswith(".csv"): #s_new = os.path.realpath(os.path.abspath(os.path.join(data_path, s))) #cr = csv.reader(open(s_new,"rb")) cr = csv.reader(open(s,"rb")) row in cr: total += int(row[0]) print "the total number of ", s, " is: ", total return total def main(): ### initialize sparkconf , sparkcontext conf = sparkconf().setappname("ruofan").setmaster("local") sc = sparkcontext(conf = conf) ### load data hdfs datafile = sc.wholetextfiles("hdfs://localhost:9000/data") ### sent application in each of slave node temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:'))) if __name__ == "__main__": main()
before running code, put directory data
local hdfs. typed following command:
$ hdfs dfs -ls /data
output:
found 2 items -rw-r--r-- 1 ying supergroup 6 2015-07-23 15:46 /data/test1.csv -rw-r--r-- 1 ying supergroup 6 2015-07-23 15:46 /data/test2.csv
so output shows data
has been in hdfs. run code sum.py
typing $ python sum.py
. however, shows error:
ioerror: [errno 2] no such file or directory:'hdfs://localhost:9000/data/test1.csv'
the following traceback of code:
file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func return f(iterator) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition f(x) file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in <lambda> temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:'))) file "/home/ying/aws_tutorial/spark_codes/sum.py", line 26, in myfunc cr = csv.reader(open(s,"rb")) ioerror: [errno 2] no such file or directory: 'hdfs://localhost:9000/data/test1.csv' @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138) @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63) @ org.apache.spark.scheduler.task.run(task.scala:70) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) 15/07/23 18:01:49 warn tasksetmanager: lost task 0.0 in stage 0.0 (tid 0, localhost): org.apache.spark.api.python.pythonexception: traceback (most recent call last): file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func return f(iterator) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition f(x) file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in <lambda> temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:'))) file "/home/ying/aws_tutorial/spark_codes/sum.py", line 26, in myfunc cr = csv.reader(open(s,"rb")) ioerror: [errno 2] no such file or directory: 'hdfs://localhost:9000/data/test1.csv' @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138) @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63) @ org.apache.spark.scheduler.task.run(task.scala:70) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) 15/07/23 18:01:49 error tasksetmanager: task 0 in stage 0.0 failed 1 times; aborting job 15/07/23 18:01:49 info taskschedulerimpl: removed taskset 0.0, tasks have completed, pool 15/07/23 18:01:49 info taskschedulerimpl: cancelling stage 0 15/07/23 18:01:49 info dagscheduler: resultstage 0 (foreach @ /home/ying/aws_tutorial/spark_codes/sum.py:41) failed in 0.392 s 15/07/23 18:01:49 info dagscheduler: job 0 failed: foreach @ /home/ying/aws_tutorial/spark_codes/sum.py:41, took 0.438280 s traceback (most recent call last): file "/home/ying/aws_tutorial/spark_codes/sum.py", line 45, in <module> main() file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in main temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:'))) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 721, in foreach self.mappartitions(processpartition).count() # force evaluation file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 972, in count return self.mappartitions(lambda i: [sum(1 _ in i)]).sum() file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 963, in sum return self.mappartitions(lambda x: [sum(x)]).reduce(operator.add) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 771, in reduce vals = self.mappartitions(func).collect() file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 745, in collect port = self.ctx._jvm.pythonrdd.collectandserve(self._jrdd.rdd()) file "/usr/local/lib/python2.7/dist-packages/py4j-0.8.2.1-py2.7.egg/py4j/java_gateway.py", line 538, in __call__ self.target_id, self.name) file "/usr/local/lib/python2.7/dist-packages/py4j-0.8.2.1-py2.7.egg/py4j/protocol.py", line 300, in get_return_value format(target_id, '.', name), value) py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.collectandserve. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 0.0 failed 1 times, recent failure: lost task 0.0 in stage 0.0 (tid 0, localhost): org.apache.spark.api.python.pythonexception: traceback (most recent call last): file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func return f(iterator) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition f(x) file "/home/ying/aws_tutorial/spark_codes/sum.py", line 41, in <lambda> temp = datafile.foreach(lambda (path, content): myfunc(str(path).strip('file:'))) file "/home/ying/aws_tutorial/spark_codes/sum.py", line 26, in myfunc cr = csv.reader(open(s,"rb")) ioerror: [errno 2] no such file or directory: 'hdfs://localhost:9000/data/test1.csv' @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138) @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63) @ org.apache.spark.scheduler.task.run(task.scala:70) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1266) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1257) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1256) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1256) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:730) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1450) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1411) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)
i'm sure master name port localhost:9000
, , have no idea error of code. appreciate if helps me fix problem.
Comments
Post a Comment