はじめに
PySpark がどんなものかまずはざっくり把握したく、macOS で触ってみたかったのでやってみた。
Spark
大規模なデータ処理を行うためのソフトウェア(ドキュメントの言葉を使うと unified analytics engine)。 1台のサーバー上で動かすこともできるけど、YARNやK8sなどのフレームワークと組み合わせることで複数のサーバーで分散処理を行うことができる。
PySpark
Spark は Scalaで実装されているため、Spark を Python で実行できるようにするための Python 用の API 。
Spark インストール
Homebrew で Spark をインストールできる。
/usr/local/Cellar/apache-spark/3.2.1/
に必要なファイルが配置される。
pyspark
コマンドもここの配下に配置され、PySpark も使うことができる。
% brew install apache-spark
依存関係で openjdk@11
もインストールされる。
==> Downloading https://ghcr.io/v2/homebrew/core/openjdk/11/manifests/11.0.14.1 Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/234ae4acbb1a62a43741a4774e7b414ce200de929b1ea9d67058e97e9b55d8f7--openjdk@11-11.0.14.1.bottle_manifest.json ==> Downloading https://ghcr.io/v2/homebrew/core/openjdk/11/blobs/sha256:44d92f169bc3ab7dfe2645dfdef33128bbe610f6aab84fc85a8cc9fb832de281 Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/93bc1716317946ab4887e71c68011db6367ed6e006f2ec3d111edbd226e1c77a--openjdk@11--11.0.14.1.catalina.bottle.tar.gz ==> Downloading https://ghcr.io/v2/homebrew/core/apache-spark/manifests/3.2.1 Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/900859faa97b53bf17aba5521cb831ed3cb04ea7d25d6b657a67e3d89ac9b002--apache-spark-3.2.1.bottle_manifest.json ==> Downloading https://ghcr.io/v2/homebrew/core/apache-spark/blobs/sha256:3f8f3309dfef579496100aad618c4f46b55795497fe259f2accb0da0c15da7ec Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/b91e2c88cd7ad0a9782efaab00ddabaebaeecbfe4043558b7f68d26caada17ed--apache-spark--3.2.1.all.bottle.tar.gz ==> Installing dependencies for apache-spark: openjdk@11 ==> Installing apache-spark dependency: openjdk@11 ==> Pouring openjdk@11--11.0.14.1.catalina.bottle.tar.gz 🍺 /usr/local/Cellar/openjdk@11/11.0.14.1: 678 files, 298.3MB ==> Installing apache-spark ==> Pouring apache-spark--3.2.1.all.bottle.tar.gz 🍺 /usr/local/Cellar/apache-spark/3.2.1: 1,472 files, 322MB ==> Running `brew cleanup apache-spark`... Disable this behaviour by setting HOMEBREW_NO_INSTALL_CLEANUP. Hide these hints with HOMEBREW_NO_ENV_HINTS (see `man brew`).
PySpark 起動
pyspark
コマンドを実行する。
% pyspark
以下のような出力がありつつ、Python ベースの Spark Shell が起動する。
起動時のメッセージ Spark context available as 'sc' (master = local[*], app id = local-**********).
にある通り、ローカルマシン上の論理コアと同数のワーカースレッドで単独実行される形となっている。
Python 3.7.9 (v3.7.9:13c94747c7, Aug 15 2020, 01:31:08) [Clang 6.0 (clang-600.0.57)] on darwin Type "help", "copyright", "credits" or "license" for more information. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.2.1/libexec/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/06/12 19:26:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.2.1 /_/ Using Python version 3.7.9 (v3.7.9:13c94747c7, Aug 15 2020 01:31:08) Spark context Web UI available at http://192.168.0.110:4040 Spark context available as 'sc' (master = local[*], app id = local-1660991198151). SparkSession available as 'spark'. >>>
チュートリアルをやる
以下のチュートリアルをやりつつ動きを見てみる。
Quick Start - Spark 3.5.2 Documentation
まずはWord Countみたいなものになっている。
チュートリアル(Baisc)
Spark では Datasets と呼ばれるアイテムの分散コレクション(データの集まり)という概念がある。 まずはこのDatasetsを作成することになるが、ただ、チュートリアルの内容を読んでいると、Python の場合は型を指定する概念がないため、 Dataset も型を指定する必要はない模様。また、これによって Pandas と R の DataFrame の概念に合わせて、DataFrame と呼ぶらしい。
Spark ソースディレクトリの README.md ファイルのテキストから新しい DataFrame を作成する。PySpark ではなく、Spark として処理する場合は DataFrame ではなく、Datasets になると思われる。
Spark のソースディレクトリは macOS で Homebrew でインストールした場合は /usr/local/Cellar/apache-spark/
以下になる。
>>> textFile = spark.read.text("/usr/local/Cellar/apache-spark/3.2.1/README.md")
行数を数える。
>>> textFile.count() 109
最初の行を出力する。
>>> textFile.first() Row(value='# Apache Spark')
Spark という文字列を含む行数を数える。
>>> textFile.filter(textFile.value.contains("Spark")).count() 19
チュートリアル(PySparkアプリケーション実行)
spark-submit
によって PySpark アプリケーションを実行してみる。
SimpleApp.py
を作成して適当なディレクトリに保存する。
これは、/usr/local/Cellar/apache-spark/3.2.1/README.md
ファイル内に a
, b
の文字がいくつ出てきたかをカウントする。
from pyspark.sql import SparkSession logFile = "/usr/local/Cellar/apache-spark/3.2.1/README.md" spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
実行してみる。
% spark-submit --master local[*] SimpleApp.py
出力がドバドバ出る。細かくみると Executer の動きをもう少し把握できそうだけどいったん置いておいて、最後の方に Lines with a: 65, lines with b: 33
と出力される。
WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.2.1/libexec/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 22/06/12 22:04:07 INFO SparkContext: Running Spark version 3.2.1 22/06/12 22:04:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/06/12 22:04:08 INFO ResourceUtils: ============================================================== 22/06/12 22:04:08 INFO ResourceUtils: No custom resources configured for spark.driver. 22/06/12 22:04:08 INFO ResourceUtils: ============================================================== 22/06/12 22:04:08 INFO SparkContext: Submitted application: SimpleApp 22/06/12 22:04:08 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 22/06/12 22:04:08 INFO ResourceProfile: Limiting resource is cpu 22/06/12 22:04:08 INFO ResourceProfileManager: Added ResourceProfile id: 0 22/06/12 22:04:08 INFO SecurityManager: Changing view acls to: daisukeshimizu 22/06/12 22:04:08 INFO SecurityManager: Changing modify acls to: daisukeshimizu 22/06/12 22:04:08 INFO SecurityManager: Changing view acls groups to: 22/06/12 22:04:08 INFO SecurityManager: Changing modify acls groups to: 22/06/12 22:04:08 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(daisukeshimizu); groups with view permissions: Set(); users with modify permissions: Set(daisukeshimizu); groups with modify permissions: Set() 22/06/12 22:04:08 INFO Utils: Successfully started service 'sparkDriver' on port 60424. 22/06/12 22:04:08 INFO SparkEnv: Registering MapOutputTracker 22/06/12 22:04:08 INFO SparkEnv: Registering BlockManagerMaster 22/06/12 22:04:08 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 22/06/12 22:04:08 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 22/06/12 22:04:09 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 22/06/12 22:04:09 INFO DiskBlockManager: Created local directory at /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/blockmgr-d53cb198-ba7d-4cd9-9d5e-2d751b8e1185 22/06/12 22:04:09 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB 22/06/12 22:04:09 INFO SparkEnv: Registering OutputCommitCoordinator 22/06/12 22:04:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 22/06/12 22:04:09 INFO Utils: Successfully started service 'SparkUI' on port 4041. 22/06/12 22:04:09 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.110:4041 22/06/12 22:04:09 INFO Executor: Starting executor ID driver on host 192.168.0.110 22/06/12 22:04:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 60425. 22/06/12 22:04:09 INFO NettyBlockTransferService: Server created on 192.168.0.110:60425 22/06/12 22:04:09 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 22/06/12 22:04:09 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.110, 60425, None) 22/06/12 22:04:09 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.110:60425 with 434.4 MiB RAM, BlockManagerId(driver, 192.168.0.110, 60425, None) 22/06/12 22:04:09 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.110, 60425, None) 22/06/12 22:04:09 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.110, 60425, None) 22/06/12 22:04:10 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir. 22/06/12 22:04:10 INFO SharedState: Warehouse path is 'file:/Users/daisukeshimizu/local/bin/spark-warehouse'. 22/06/12 22:04:12 INFO InMemoryFileIndex: It took 53 ms to list leaf files for 1 paths. 22/06/12 22:04:14 INFO FileSourceStrategy: Pushed Filters: 22/06/12 22:04:14 INFO FileSourceStrategy: Post-Scan Filters: 22/06/12 22:04:14 INFO FileSourceStrategy: Output Data Schema: struct<value: string> 22/06/12 22:04:16 INFO CodeGenerator: Code generated in 271.021986 ms 22/06/12 22:04:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 189.9 KiB, free 434.2 MiB) 22/06/12 22:04:16 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 32.8 KiB, free 434.2 MiB) 22/06/12 22:04:16 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.110:60425 (size: 32.8 KiB, free: 434.4 MiB) 22/06/12 22:04:16 INFO SparkContext: Created broadcast 0 from count at NativeMethodAccessorImpl.java:0 22/06/12 22:04:16 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes. 22/06/12 22:04:16 INFO DefaultCachedBatchSerializer: Predicate isnotnull(value#0) generates partition filter: ((value.count#37 - value.nullCount#36) > 0) 22/06/12 22:04:16 INFO DAGScheduler: Registering RDD 8 (count at NativeMethodAccessorImpl.java:0) as input to shuffle 0 22/06/12 22:04:16 INFO DAGScheduler: Got map stage job 0 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions 22/06/12 22:04:16 INFO DAGScheduler: Final stage: ShuffleMapStage 0 (count at NativeMethodAccessorImpl.java:0) 22/06/12 22:04:16 INFO DAGScheduler: Parents of final stage: List() 22/06/12 22:04:16 INFO DAGScheduler: Missing parents: List() 22/06/12 22:04:16 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[8] at count at NativeMethodAccessorImpl.java:0), which has no missing parents 22/06/12 22:04:17 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 25.5 KiB, free 434.2 MiB) 22/06/12 22:04:17 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 11.8 KiB, free 434.1 MiB) 22/06/12 22:04:17 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.0.110:60425 (size: 11.8 KiB, free: 434.4 MiB) 22/06/12 22:04:17 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1478 22/06/12 22:04:17 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[8] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 22/06/12 22:04:17 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0 22/06/12 22:04:17 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.110, executor driver, partition 0, PROCESS_LOCAL, 4867 bytes) taskResourceAssignments Map() 22/06/12 22:04:17 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 22/06/12 22:04:17 INFO FileScanRDD: Reading File path: file:///usr/local/Cellar/apache-spark/3.2.1/README.md, range: 0-4512, partition values: [empty row] 22/06/12 22:04:17 INFO CodeGenerator: Code generated in 16.078328 ms 22/06/12 22:04:17 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 5.1 KiB, free 434.1 MiB) 22/06/12 22:04:17 INFO BlockManagerInfo: Added rdd_3_0 in memory on 192.168.0.110:60425 (size: 5.1 KiB, free: 434.4 MiB) 22/06/12 22:04:17 INFO CodeGenerator: Code generated in 7.043371 ms 22/06/12 22:04:17 INFO CodeGenerator: Code generated in 87.238152 ms 22/06/12 22:04:17 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2158 bytes result sent to driver 22/06/12 22:04:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 739 ms on 192.168.0.110 (executor driver) (1/1) 22/06/12 22:04:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 22/06/12 22:04:18 INFO DAGScheduler: ShuffleMapStage 0 (count at NativeMethodAccessorImpl.java:0) finished in 1.028 s 22/06/12 22:04:18 INFO DAGScheduler: looking for newly runnable stages 22/06/12 22:04:18 INFO DAGScheduler: running: Set() 22/06/12 22:04:18 INFO DAGScheduler: waiting: Set() 22/06/12 22:04:18 INFO DAGScheduler: failed: Set() 22/06/12 22:04:18 INFO CodeGenerator: Code generated in 20.556764 ms 22/06/12 22:04:18 INFO SparkContext: Starting job: count at NativeMethodAccessorImpl.java:0 22/06/12 22:04:18 INFO DAGScheduler: Got job 1 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions 22/06/12 22:04:18 INFO DAGScheduler: Final stage: ResultStage 2 (count at NativeMethodAccessorImpl.java:0) 22/06/12 22:04:18 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1) 22/06/12 22:04:18 INFO DAGScheduler: Missing parents: List() 22/06/12 22:04:18 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[11] at count at NativeMethodAccessorImpl.java:0), which has no missing parents 22/06/12 22:04:18 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 11.0 KiB, free 434.1 MiB) 22/06/12 22:04:18 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.5 KiB, free 434.1 MiB) 22/06/12 22:04:18 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.0.110:60425 (size: 5.5 KiB, free: 434.3 MiB) 22/06/12 22:04:18 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1478 22/06/12 22:04:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[11] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 22/06/12 22:04:18 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0 22/06/12 22:04:18 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 1) (192.168.0.110, executor driver, partition 0, NODE_LOCAL, 4453 bytes) taskResourceAssignments Map() 22/06/12 22:04:18 INFO Executor: Running task 0.0 in stage 2.0 (TID 1) 22/06/12 22:04:18 INFO ShuffleBlockFetcherIterator: Getting 1 (60.0 B) non-empty blocks including 1 (60.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks 22/06/12 22:04:18 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 48 ms 22/06/12 22:04:18 INFO Executor: Finished task 0.0 in stage 2.0 (TID 1). 2691 bytes result sent to driver 22/06/12 22:04:18 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 1) in 277 ms on 192.168.0.110 (executor driver) (1/1) 22/06/12 22:04:18 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 22/06/12 22:04:18 INFO DAGScheduler: ResultStage 2 (count at NativeMethodAccessorImpl.java:0) finished in 0.343 s 22/06/12 22:04:18 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job 22/06/12 22:04:18 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished 22/06/12 22:04:18 INFO DAGScheduler: Job 1 finished: count at NativeMethodAccessorImpl.java:0, took 0.443001 s 22/06/12 22:04:18 INFO DefaultCachedBatchSerializer: Predicate isnotnull(value#0) generates partition filter: ((value.count#68 - value.nullCount#67) > 0) 22/06/12 22:04:18 INFO DAGScheduler: Registering RDD 16 (count at NativeMethodAccessorImpl.java:0) as input to shuffle 1 22/06/12 22:04:18 INFO DAGScheduler: Got map stage job 2 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions 22/06/12 22:04:18 INFO DAGScheduler: Final stage: ShuffleMapStage 3 (count at NativeMethodAccessorImpl.java:0) 22/06/12 22:04:18 INFO DAGScheduler: Parents of final stage: List() 22/06/12 22:04:18 INFO DAGScheduler: Missing parents: List() 22/06/12 22:04:18 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[16] at count at NativeMethodAccessorImpl.java:0), which has no missing parents 22/06/12 22:04:18 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 25.5 KiB, free 434.1 MiB) 22/06/12 22:04:18 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 11.8 KiB, free 434.1 MiB) 22/06/12 22:04:18 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.0.110:60425 (size: 11.8 KiB, free: 434.3 MiB) 22/06/12 22:04:18 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1478 22/06/12 22:04:18 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[16] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 22/06/12 22:04:18 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks resource profile 0 22/06/12 22:04:18 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 2) (192.168.0.110, executor driver, partition 0, PROCESS_LOCAL, 4867 bytes) taskResourceAssignments Map() 22/06/12 22:04:18 INFO Executor: Running task 0.0 in stage 3.0 (TID 2) 22/06/12 22:04:18 INFO BlockManager: Found block rdd_3_0 locally 22/06/12 22:04:18 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 2115 bytes result sent to driver 22/06/12 22:04:18 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 41 ms on 192.168.0.110 (executor driver) (1/1) 22/06/12 22:04:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 22/06/12 22:04:18 INFO DAGScheduler: ShuffleMapStage 3 (count at NativeMethodAccessorImpl.java:0) finished in 0.066 s 22/06/12 22:04:18 INFO DAGScheduler: looking for newly runnable stages 22/06/12 22:04:18 INFO DAGScheduler: running: Set() 22/06/12 22:04:18 INFO DAGScheduler: waiting: Set() 22/06/12 22:04:18 INFO DAGScheduler: failed: Set() 22/06/12 22:04:19 INFO SparkContext: Starting job: count at NativeMethodAccessorImpl.java:0 22/06/12 22:04:19 INFO DAGScheduler: Got job 3 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions 22/06/12 22:04:19 INFO DAGScheduler: Final stage: ResultStage 5 (count at NativeMethodAccessorImpl.java:0) 22/06/12 22:04:19 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 4) 22/06/12 22:04:19 INFO DAGScheduler: Missing parents: List() 22/06/12 22:04:19 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[19] at count at NativeMethodAccessorImpl.java:0), which has no missing parents 22/06/12 22:04:19 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 11.0 KiB, free 434.1 MiB) 22/06/12 22:04:19 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 5.5 KiB, free 434.1 MiB) 22/06/12 22:04:19 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.0.110:60425 (size: 5.5 KiB, free: 434.3 MiB) 22/06/12 22:04:19 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1478 22/06/12 22:04:19 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[19] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 22/06/12 22:04:19 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks resource profile 0 22/06/12 22:04:19 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 3) (192.168.0.110, executor driver, partition 0, NODE_LOCAL, 4453 bytes) taskResourceAssignments Map() 22/06/12 22:04:19 INFO Executor: Running task 0.0 in stage 5.0 (TID 3) 22/06/12 22:04:19 INFO ShuffleBlockFetcherIterator: Getting 1 (60.0 B) non-empty blocks including 1 (60.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks 22/06/12 22:04:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms 22/06/12 22:04:19 INFO Executor: Finished task 0.0 in stage 5.0 (TID 3). 2648 bytes result sent to driver 22/06/12 22:04:19 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 3) in 19 ms on 192.168.0.110 (executor driver) (1/1) 22/06/12 22:04:19 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 22/06/12 22:04:19 INFO DAGScheduler: ResultStage 5 (count at NativeMethodAccessorImpl.java:0) finished in 0.046 s 22/06/12 22:04:19 INFO DAGScheduler: Job 3 is finished. Cancelling potential speculative or zombie tasks for this job 22/06/12 22:04:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 5: Stage finished 22/06/12 22:04:19 INFO DAGScheduler: Job 3 finished: count at NativeMethodAccessorImpl.java:0, took 0.063246 s Lines with a: 65, lines with b: 33 22/06/12 22:04:19 INFO SparkUI: Stopped Spark web UI at http://192.168.0.110:4041 22/06/12 22:04:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 22/06/12 22:04:19 INFO MemoryStore: MemoryStore cleared 22/06/12 22:04:19 INFO BlockManager: BlockManager stopped 22/06/12 22:04:19 INFO BlockManagerMaster: BlockManagerMaster stopped 22/06/12 22:04:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 22/06/12 22:04:19 INFO SparkContext: Successfully stopped SparkContext 22/06/12 22:04:19 INFO ShutdownHookManager: Shutdown hook called 22/06/12 22:04:19 INFO ShutdownHookManager: Deleting directory /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/spark-52635c0d-f1a3-4957-bf76-4a989d7ac08b 22/06/12 22:04:19 INFO ShutdownHookManager: Deleting directory /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/spark-a66a2146-3e22-4894-b63f-7631ae8d86b1 22/06/12 22:04:19 INFO ShutdownHookManager: Deleting directory /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/spark-a66a2146-3e22-4894-b63f-7631ae8d86b1/pyspark-f67202d9-534f-4e01-8d2f-8434baa5d3ae
まとめ
macOS で PySpark を動かしてみたくて、Homebrew でインストールして、チュートリアルの内容をやってみた。 クラスターモードのの仕組みとかもわかっていないとあんまりピンとこないかもしれない。