Apache Spark 中 RDD 分區的數量是如何決定的?
問題
- Spark的分區數量是如何決定的?
- 我是否需要在某處明確指定可用 CPU 核心的數量,以便分區數量相同(例如並行化方法的 numPartition arg,但每當核心數量發生變化時需要更新程序)?
背景
在環境中安裝了 Spark 集群,沒有更改 spark-env.sh、spark-defaults.conf 文件或程序中的 SparkConf 對象。
對於 N Queen 程序,分區數為 2,僅分配一個節點任務。對於字數統計程序,分區數為 22,任務分配給所有節點。對這兩個程序都使用了 spark-submit。
程式
N皇后
val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) def isSafe(column: Int, placement: List[Int]): Boolean = { ... } def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... } val initial = sc.parallelize(queensAtFirst) //val initial = sc.parallelize(queensAtFirst, 12) println("Partitions = %d".format(initial.partitions.size)) val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()
字數
val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv") println("Patitions = %d".format(lines.partitions.size)) val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1) val counts = words.reduceByKey(_ + _)
環境
Ubuntu 14.04 上的 Spark 2.0.1(3 個節點,每個節點有 4 個 CPU)。
獨立部署(不是 YARN 也不是 Mesos)
在How-to: Tune Your Apache Spark Jobs (Part 2)中找到資訊。
這個數字是如何確定的?上一篇文章中描述了 Spark 將 RDD 分組為階段的方式。(快速提醒一下,像 repartition 和 reduceByKey 這樣的轉換會導致階段邊界。)階段中的任務數與階段中最後一個 RDD 中的分區數相同。RDD 中的分區數與其所依賴的 RDD 中的分區數相同,但有幾個例外:coalesce 轉換允許創建一個分區少於其父 RDD 的 RDD,union 轉換創建一個具有其父母的分區數的總和,笛卡爾用他們的產品創建一個RDD。
沒有父母的RDD怎麼辦?由 textFile 或 hadoopFile 生成的 RDD 的分區由使用的底層 MapReduce InputFormat 確定。通常,每個被讀取的 HDFS 塊都會有一個分區。由 parallelize 生成的 RDD 的分區來自使用者給定的參數,或者 spark.default.parallelism 如果沒有給出。
spark.default.parallelism 選項修復了症狀。
--conf spark.default.parallelism=24
設置為 12(與核心數相同)會導致節點使用不均。