自由帳

とりとめのない学習メモです。主に Web サービスのシステム基盤や運用に関することを書いています

macOS で PySpark を試す

はじめに

PySpark がどんなものかまずはざっくり把握したく、macOS で触ってみたかったのでやってみた。

Spark

大規模なデータ処理を行うためのソフトウェア(ドキュメントの言葉を使うと unified analytics engine)。 1台のサーバー上で動かすこともできるけど、YARNやK8sなどのフレームワークと組み合わせることで複数のサーバーで分散処理を行うことができる。

spark.apache.org

PySpark

Spark は Scalaで実装されているため、Spark を Python で実行できるようにするための Python 用の API

spark.apache.org

Spark インストール

Homebrew で Spark をインストールできる。

/usr/local/Cellar/apache-spark/3.2.1/ に必要なファイルが配置される。 pyspark コマンドもここの配下に配置され、PySpark も使うことができる。

 % brew install apache-spark

依存関係で openjdk@11 もインストールされる。

==> Downloading https://ghcr.io/v2/homebrew/core/openjdk/11/manifests/11.0.14.1
Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/234ae4acbb1a62a43741a4774e7b414ce200de929b1ea9d67058e97e9b55d8f7--openjdk@11-11.0.14.1.bottle_manifest.json
==> Downloading https://ghcr.io/v2/homebrew/core/openjdk/11/blobs/sha256:44d92f169bc3ab7dfe2645dfdef33128bbe610f6aab84fc85a8cc9fb832de281
Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/93bc1716317946ab4887e71c68011db6367ed6e006f2ec3d111edbd226e1c77a--openjdk@11--11.0.14.1.catalina.bottle.tar.gz
==> Downloading https://ghcr.io/v2/homebrew/core/apache-spark/manifests/3.2.1
Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/900859faa97b53bf17aba5521cb831ed3cb04ea7d25d6b657a67e3d89ac9b002--apache-spark-3.2.1.bottle_manifest.json
==> Downloading https://ghcr.io/v2/homebrew/core/apache-spark/blobs/sha256:3f8f3309dfef579496100aad618c4f46b55795497fe259f2accb0da0c15da7ec
Already downloaded: /Users/daisukeshimizu/Library/Caches/Homebrew/downloads/b91e2c88cd7ad0a9782efaab00ddabaebaeecbfe4043558b7f68d26caada17ed--apache-spark--3.2.1.all.bottle.tar.gz
==> Installing dependencies for apache-spark: openjdk@11
==> Installing apache-spark dependency: openjdk@11
==> Pouring openjdk@11--11.0.14.1.catalina.bottle.tar.gz
🍺  /usr/local/Cellar/openjdk@11/11.0.14.1: 678 files, 298.3MB
==> Installing apache-spark
==> Pouring apache-spark--3.2.1.all.bottle.tar.gz
🍺  /usr/local/Cellar/apache-spark/3.2.1: 1,472 files, 322MB
==> Running `brew cleanup apache-spark`...
Disable this behaviour by setting HOMEBREW_NO_INSTALL_CLEANUP.
Hide these hints with HOMEBREW_NO_ENV_HINTS (see `man brew`).

PySpark 起動

pyspark コマンドを実行する。

% pyspark 

以下のような出力がありつつ、Python ベースの Spark Shell が起動する。 起動時のメッセージ Spark context available as 'sc' (master = local[*], app id = local-**********). にある通り、ローカルマシン上の論理コアと同数のワーカースレッドで単独実行される形となっている。

Python 3.7.9 (v3.7.9:13c94747c7, Aug 15 2020, 01:31:08)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.2.1/libexec/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/12 19:26:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Python version 3.7.9 (v3.7.9:13c94747c7, Aug 15 2020 01:31:08)
Spark context Web UI available at http://192.168.0.110:4040
Spark context available as 'sc' (master = local[*], app id = local-1660991198151).
SparkSession available as 'spark'.
>>>

チュートリアルをやる

以下のチュートリアルをやりつつ動きを見てみる。

Quick Start - Spark 3.5.2 Documentation

まずはWord Countみたいなものになっている。

チュートリアル(Baisc)

Spark では Datasets と呼ばれるアイテムの分散コレクション(データの集まり)という概念がある。 まずはこのDatasetsを作成することになるが、ただ、チュートリアルの内容を読んでいると、Python の場合は型を指定する概念がないため、 Dataset も型を指定する必要はない模様。また、これによって Pandas と R の DataFrame の概念に合わせて、DataFrame と呼ぶらしい。

Spark ソースディレクトリの README.md ファイルのテキストから新しい DataFrame を作成する。PySpark ではなく、Spark として処理する場合は DataFrame ではなく、Datasets になると思われる。 Spark のソースディレクトリは macOS で Homebrew でインストールした場合は /usr/local/Cellar/apache-spark/ 以下になる。

>>> textFile = spark.read.text("/usr/local/Cellar/apache-spark/3.2.1/README.md")

行数を数える。

>>> textFile.count()
109

最初の行を出力する。

>>> textFile.first()
Row(value='# Apache Spark')

Spark という文字列を含む行数を数える。

>>> textFile.filter(textFile.value.contains("Spark")).count()
19

チュートリアル(PySparkアプリケーション実行)

spark-submit によって PySpark アプリケーションを実行してみる。

SimpleApp.py を作成して適当なディレクトリに保存する。 これは、/usr/local/Cellar/apache-spark/3.2.1/README.md ファイル内に a, b の文字がいくつ出てきたかをカウントする。

from pyspark.sql import SparkSession

logFile = "/usr/local/Cellar/apache-spark/3.2.1/README.md"
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

実行してみる。

% spark-submit --master local[*] SimpleApp.py

出力がドバドバ出る。細かくみると Executer の動きをもう少し把握できそうだけどいったん置いておいて、最後の方に Lines with a: 65, lines with b: 33 と出力される。

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.2.1/libexec/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/06/12 22:04:07 INFO SparkContext: Running Spark version 3.2.1
22/06/12 22:04:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/12 22:04:08 INFO ResourceUtils: ==============================================================
22/06/12 22:04:08 INFO ResourceUtils: No custom resources configured for spark.driver.
22/06/12 22:04:08 INFO ResourceUtils: ==============================================================
22/06/12 22:04:08 INFO SparkContext: Submitted application: SimpleApp
22/06/12 22:04:08 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/06/12 22:04:08 INFO ResourceProfile: Limiting resource is cpu
22/06/12 22:04:08 INFO ResourceProfileManager: Added ResourceProfile id: 0
22/06/12 22:04:08 INFO SecurityManager: Changing view acls to: daisukeshimizu
22/06/12 22:04:08 INFO SecurityManager: Changing modify acls to: daisukeshimizu
22/06/12 22:04:08 INFO SecurityManager: Changing view acls groups to:
22/06/12 22:04:08 INFO SecurityManager: Changing modify acls groups to:
22/06/12 22:04:08 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(daisukeshimizu); groups with view permissions: Set(); users  with modify permissions: Set(daisukeshimizu); groups with modify permissions: Set()
22/06/12 22:04:08 INFO Utils: Successfully started service 'sparkDriver' on port 60424.
22/06/12 22:04:08 INFO SparkEnv: Registering MapOutputTracker
22/06/12 22:04:08 INFO SparkEnv: Registering BlockManagerMaster
22/06/12 22:04:08 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/06/12 22:04:08 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/06/12 22:04:09 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
22/06/12 22:04:09 INFO DiskBlockManager: Created local directory at /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/blockmgr-d53cb198-ba7d-4cd9-9d5e-2d751b8e1185
22/06/12 22:04:09 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
22/06/12 22:04:09 INFO SparkEnv: Registering OutputCommitCoordinator
22/06/12 22:04:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/12 22:04:09 INFO Utils: Successfully started service 'SparkUI' on port 4041.
22/06/12 22:04:09 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.110:4041
22/06/12 22:04:09 INFO Executor: Starting executor ID driver on host 192.168.0.110
22/06/12 22:04:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 60425.
22/06/12 22:04:09 INFO NettyBlockTransferService: Server created on 192.168.0.110:60425
22/06/12 22:04:09 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/06/12 22:04:09 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.110, 60425, None)
22/06/12 22:04:09 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.110:60425 with 434.4 MiB RAM, BlockManagerId(driver, 192.168.0.110, 60425, None)
22/06/12 22:04:09 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.110, 60425, None)
22/06/12 22:04:09 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.110, 60425, None)
22/06/12 22:04:10 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
22/06/12 22:04:10 INFO SharedState: Warehouse path is 'file:/Users/daisukeshimizu/local/bin/spark-warehouse'.
22/06/12 22:04:12 INFO InMemoryFileIndex: It took 53 ms to list leaf files for 1 paths.
22/06/12 22:04:14 INFO FileSourceStrategy: Pushed Filters:
22/06/12 22:04:14 INFO FileSourceStrategy: Post-Scan Filters:
22/06/12 22:04:14 INFO FileSourceStrategy: Output Data Schema: struct<value: string>
22/06/12 22:04:16 INFO CodeGenerator: Code generated in 271.021986 ms
22/06/12 22:04:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 189.9 KiB, free 434.2 MiB)
22/06/12 22:04:16 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 32.8 KiB, free 434.2 MiB)
22/06/12 22:04:16 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.110:60425 (size: 32.8 KiB, free: 434.4 MiB)
22/06/12 22:04:16 INFO SparkContext: Created broadcast 0 from count at NativeMethodAccessorImpl.java:0
22/06/12 22:04:16 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
22/06/12 22:04:16 INFO DefaultCachedBatchSerializer: Predicate isnotnull(value#0) generates partition filter: ((value.count#37 - value.nullCount#36) > 0)
22/06/12 22:04:16 INFO DAGScheduler: Registering RDD 8 (count at NativeMethodAccessorImpl.java:0) as input to shuffle 0
22/06/12 22:04:16 INFO DAGScheduler: Got map stage job 0 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/06/12 22:04:16 INFO DAGScheduler: Final stage: ShuffleMapStage 0 (count at NativeMethodAccessorImpl.java:0)
22/06/12 22:04:16 INFO DAGScheduler: Parents of final stage: List()
22/06/12 22:04:16 INFO DAGScheduler: Missing parents: List()
22/06/12 22:04:16 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[8] at count at NativeMethodAccessorImpl.java:0), which has no missing parents
22/06/12 22:04:17 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 25.5 KiB, free 434.2 MiB)
22/06/12 22:04:17 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 11.8 KiB, free 434.1 MiB)
22/06/12 22:04:17 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.0.110:60425 (size: 11.8 KiB, free: 434.4 MiB)
22/06/12 22:04:17 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1478
22/06/12 22:04:17 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[8] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
22/06/12 22:04:17 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
22/06/12 22:04:17 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.110, executor driver, partition 0, PROCESS_LOCAL, 4867 bytes) taskResourceAssignments Map()
22/06/12 22:04:17 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
22/06/12 22:04:17 INFO FileScanRDD: Reading File path: file:///usr/local/Cellar/apache-spark/3.2.1/README.md, range: 0-4512, partition values: [empty row]
22/06/12 22:04:17 INFO CodeGenerator: Code generated in 16.078328 ms
22/06/12 22:04:17 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 5.1 KiB, free 434.1 MiB)
22/06/12 22:04:17 INFO BlockManagerInfo: Added rdd_3_0 in memory on 192.168.0.110:60425 (size: 5.1 KiB, free: 434.4 MiB)
22/06/12 22:04:17 INFO CodeGenerator: Code generated in 7.043371 ms
22/06/12 22:04:17 INFO CodeGenerator: Code generated in 87.238152 ms
22/06/12 22:04:17 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2158 bytes result sent to driver
22/06/12 22:04:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 739 ms on 192.168.0.110 (executor driver) (1/1)
22/06/12 22:04:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
22/06/12 22:04:18 INFO DAGScheduler: ShuffleMapStage 0 (count at NativeMethodAccessorImpl.java:0) finished in 1.028 s
22/06/12 22:04:18 INFO DAGScheduler: looking for newly runnable stages
22/06/12 22:04:18 INFO DAGScheduler: running: Set()
22/06/12 22:04:18 INFO DAGScheduler: waiting: Set()
22/06/12 22:04:18 INFO DAGScheduler: failed: Set()
22/06/12 22:04:18 INFO CodeGenerator: Code generated in 20.556764 ms
22/06/12 22:04:18 INFO SparkContext: Starting job: count at NativeMethodAccessorImpl.java:0
22/06/12 22:04:18 INFO DAGScheduler: Got job 1 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/06/12 22:04:18 INFO DAGScheduler: Final stage: ResultStage 2 (count at NativeMethodAccessorImpl.java:0)
22/06/12 22:04:18 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
22/06/12 22:04:18 INFO DAGScheduler: Missing parents: List()
22/06/12 22:04:18 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[11] at count at NativeMethodAccessorImpl.java:0), which has no missing parents
22/06/12 22:04:18 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 11.0 KiB, free 434.1 MiB)
22/06/12 22:04:18 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.5 KiB, free 434.1 MiB)
22/06/12 22:04:18 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.0.110:60425 (size: 5.5 KiB, free: 434.3 MiB)
22/06/12 22:04:18 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1478
22/06/12 22:04:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[11] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
22/06/12 22:04:18 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
22/06/12 22:04:18 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 1) (192.168.0.110, executor driver, partition 0, NODE_LOCAL, 4453 bytes) taskResourceAssignments Map()
22/06/12 22:04:18 INFO Executor: Running task 0.0 in stage 2.0 (TID 1)
22/06/12 22:04:18 INFO ShuffleBlockFetcherIterator: Getting 1 (60.0 B) non-empty blocks including 1 (60.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
22/06/12 22:04:18 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 48 ms
22/06/12 22:04:18 INFO Executor: Finished task 0.0 in stage 2.0 (TID 1). 2691 bytes result sent to driver
22/06/12 22:04:18 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 1) in 277 ms on 192.168.0.110 (executor driver) (1/1)
22/06/12 22:04:18 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
22/06/12 22:04:18 INFO DAGScheduler: ResultStage 2 (count at NativeMethodAccessorImpl.java:0) finished in 0.343 s
22/06/12 22:04:18 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
22/06/12 22:04:18 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
22/06/12 22:04:18 INFO DAGScheduler: Job 1 finished: count at NativeMethodAccessorImpl.java:0, took 0.443001 s
22/06/12 22:04:18 INFO DefaultCachedBatchSerializer: Predicate isnotnull(value#0) generates partition filter: ((value.count#68 - value.nullCount#67) > 0)
22/06/12 22:04:18 INFO DAGScheduler: Registering RDD 16 (count at NativeMethodAccessorImpl.java:0) as input to shuffle 1
22/06/12 22:04:18 INFO DAGScheduler: Got map stage job 2 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/06/12 22:04:18 INFO DAGScheduler: Final stage: ShuffleMapStage 3 (count at NativeMethodAccessorImpl.java:0)
22/06/12 22:04:18 INFO DAGScheduler: Parents of final stage: List()
22/06/12 22:04:18 INFO DAGScheduler: Missing parents: List()
22/06/12 22:04:18 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[16] at count at NativeMethodAccessorImpl.java:0), which has no missing parents
22/06/12 22:04:18 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 25.5 KiB, free 434.1 MiB)
22/06/12 22:04:18 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 11.8 KiB, free 434.1 MiB)
22/06/12 22:04:18 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.0.110:60425 (size: 11.8 KiB, free: 434.3 MiB)
22/06/12 22:04:18 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1478
22/06/12 22:04:18 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[16] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
22/06/12 22:04:18 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks resource profile 0
22/06/12 22:04:18 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 2) (192.168.0.110, executor driver, partition 0, PROCESS_LOCAL, 4867 bytes) taskResourceAssignments Map()
22/06/12 22:04:18 INFO Executor: Running task 0.0 in stage 3.0 (TID 2)
22/06/12 22:04:18 INFO BlockManager: Found block rdd_3_0 locally
22/06/12 22:04:18 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 2115 bytes result sent to driver
22/06/12 22:04:18 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 41 ms on 192.168.0.110 (executor driver) (1/1)
22/06/12 22:04:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
22/06/12 22:04:18 INFO DAGScheduler: ShuffleMapStage 3 (count at NativeMethodAccessorImpl.java:0) finished in 0.066 s
22/06/12 22:04:18 INFO DAGScheduler: looking for newly runnable stages
22/06/12 22:04:18 INFO DAGScheduler: running: Set()
22/06/12 22:04:18 INFO DAGScheduler: waiting: Set()
22/06/12 22:04:18 INFO DAGScheduler: failed: Set()
22/06/12 22:04:19 INFO SparkContext: Starting job: count at NativeMethodAccessorImpl.java:0
22/06/12 22:04:19 INFO DAGScheduler: Got job 3 (count at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/06/12 22:04:19 INFO DAGScheduler: Final stage: ResultStage 5 (count at NativeMethodAccessorImpl.java:0)
22/06/12 22:04:19 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 4)
22/06/12 22:04:19 INFO DAGScheduler: Missing parents: List()
22/06/12 22:04:19 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[19] at count at NativeMethodAccessorImpl.java:0), which has no missing parents
22/06/12 22:04:19 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 11.0 KiB, free 434.1 MiB)
22/06/12 22:04:19 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 5.5 KiB, free 434.1 MiB)
22/06/12 22:04:19 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.0.110:60425 (size: 5.5 KiB, free: 434.3 MiB)
22/06/12 22:04:19 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1478
22/06/12 22:04:19 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[19] at count at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
22/06/12 22:04:19 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks resource profile 0
22/06/12 22:04:19 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 3) (192.168.0.110, executor driver, partition 0, NODE_LOCAL, 4453 bytes) taskResourceAssignments Map()
22/06/12 22:04:19 INFO Executor: Running task 0.0 in stage 5.0 (TID 3)
22/06/12 22:04:19 INFO ShuffleBlockFetcherIterator: Getting 1 (60.0 B) non-empty blocks including 1 (60.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
22/06/12 22:04:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
22/06/12 22:04:19 INFO Executor: Finished task 0.0 in stage 5.0 (TID 3). 2648 bytes result sent to driver
22/06/12 22:04:19 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 3) in 19 ms on 192.168.0.110 (executor driver) (1/1)
22/06/12 22:04:19 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
22/06/12 22:04:19 INFO DAGScheduler: ResultStage 5 (count at NativeMethodAccessorImpl.java:0) finished in 0.046 s
22/06/12 22:04:19 INFO DAGScheduler: Job 3 is finished. Cancelling potential speculative or zombie tasks for this job
22/06/12 22:04:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 5: Stage finished
22/06/12 22:04:19 INFO DAGScheduler: Job 3 finished: count at NativeMethodAccessorImpl.java:0, took 0.063246 s
Lines with a: 65, lines with b: 33
22/06/12 22:04:19 INFO SparkUI: Stopped Spark web UI at http://192.168.0.110:4041
22/06/12 22:04:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/06/12 22:04:19 INFO MemoryStore: MemoryStore cleared
22/06/12 22:04:19 INFO BlockManager: BlockManager stopped
22/06/12 22:04:19 INFO BlockManagerMaster: BlockManagerMaster stopped
22/06/12 22:04:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/06/12 22:04:19 INFO SparkContext: Successfully stopped SparkContext
22/06/12 22:04:19 INFO ShutdownHookManager: Shutdown hook called
22/06/12 22:04:19 INFO ShutdownHookManager: Deleting directory /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/spark-52635c0d-f1a3-4957-bf76-4a989d7ac08b
22/06/12 22:04:19 INFO ShutdownHookManager: Deleting directory /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/spark-a66a2146-3e22-4894-b63f-7631ae8d86b1
22/06/12 22:04:19 INFO ShutdownHookManager: Deleting directory /private/var/folders/6_/yxwcl8kn6dz_fpt7r0z1qd8h0000gn/T/spark-a66a2146-3e22-4894-b63f-7631ae8d86b1/pyspark-f67202d9-534f-4e01-8d2f-8434baa5d3ae

まとめ

macOS で PySpark を動かしてみたくて、Homebrew でインストールして、チュートリアルの内容をやってみた。 クラスターモードのの仕組みとかもわかっていないとあんまりピンとこないかもしれない。