Skip to content

Spark tuning

Spark properties provide a wide range of configuration options that can be used to fine-tune user Spark session behavior and performance. By changing the forth mentioned options you can solve some of the commonly encountered issues with running your workloads. Some of the commonly used Spark properties altogether with recommended values for typical NXCALS application (as defined in so-called Flavors) are grouped in the categories below.

Executor tuning

An executor is a separate process that runs on worker nodes and performs tasks as part of a distributed computation. Executors are responsible for executing tasks assigned to them by the Spark driver program, and they store intermediate data in memory for processing.

  • spark.executor.instances (typically between 4 and 16) Number of executor instances to be launched for a Spark application in a cluster

  • spark.executor.cores (typically between 2 and 4) It determines the number of CPU cores allocated to each executor process in a Spark application when it is running in cluster mode. This parameter is limited by the available quota for a given users.

Note

For the special use cases please contact our support so a dedicated use queue with bigger resources can be assigned or created. Both default limits: the max 32 cores allocated per container on the cluster and the total number of max 100 cores allocated across all containers on the node can be increased if required.

  • spark.executor.memory (typically between 2g and 6g) Amount of memory to use per executor process. When set too low, it may result in out-of-memory errors or performance degradation due to frequent data spills to disk. On the other hand, if it is set too high, it may lead to inefficient resource usage and reduced scalability.

Driver tuning

The driver program defines the high-level logic and orchestrates the execution of tasks across multiple worker nodes in the cluster. It is responsible for coordinating the overall execution of the Spark application. It communicates with the Spark master node, to request resources, monitor the progress of tasks. It reads data, performs transformations, and collects results. It also handles any errors or exceptions that occur during the execution of the Spark application.

  • spark.driver.maxResultSize (default 1g, typically being set to 1t meaning no limit) Represents limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Jobs will be aborted if the total size is above this limit resulting in a session disconnection error. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.

  • spark.task.maxDirectResultSize (default 1g, max 2g since spark 3.5.0) Is a threshold size beyond which the intermediate results or data generated by a task are not sent directly back to the driver program or other tasks. This property helps prevent the driver program from running out of memory due to large intermediate results, which can occur if tasks generate a large amount of data that needs to be sent back to the driver program.

  • spark.driver.memory (default 1g) Specifies the amount of memory to allocate for the driver program

Extraction Tuning

When working with arrays enabling the following parameters in certain cases leads up to 10x performance improvement which are nested structures in NXCALS).

  • spark.sql.parquet.enableNestedColumnVectorizedReader (default false, set to true in Flavors and enabled on SWAN) Enable or disable the nested column vectorized reader during the reading of Parquet files for nested schemas including struct, list map etc. When set to true it significantly improves performance when working with nested data structures. However, it can also increase memory usage.

  • spark.sql.parquet.columnarReaderBatchSize: (default 4096, can be lowered when working with arrays and experiencing memory issues for example down to 32) Controls the number of columnar chunks to be loaded in a single batch during the reading of Parquet files Setting higher value may increase the amount of memory consumed during the reading process (resulting in Out-Of-Memory for vectors), but theoretically improving reading performance. Low value reduces memory usage but may degrade performance.

Use cases:

1. When getting error similar to the one below while extracting VectorNumeric data:

Py4JJavaError: An error occurred while calling o231.collectToPython.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 34.0 failed 4 times, most recent failure: Lost task 11.3 in stage 34.0 (TID 1203) (ithdp2001.cern.ch executor 57): ExecutorLostFailure (executor 57 exited caused by one of the running tasks) Reason: Container from a bad node: container_e32_1681295651861_7283_03_000012 on host: ithdp2001.cern.ch. Exit status: 143. Diagnostics: [2023-04-19 16:37:44.892]Container killed on request. Exit code is 143  
[2023-04-19 16:37:44.913]Container exited with a non-zero exit code 143.  
[2023-04-19 16:37:44.914]Killed by external signal
A solution was to set the following values:

spark.executor.memory: 20g
spark.sql.parquet.columnarReaderBatchSize: 16

2. "I'm having an error when extracting roughly a day's worth of data":

Traceback (most recent call last):
File "/home/anton/code/cern/venv/lib/python3.9/site-packages/py4j/clientserver.py", line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
...
py4j.protocol.Py4JNetworkError: Error while sending or receiving
A solution for this case was to increase Spark executor memory and Spark driver memory as follows:

spark.executor.memory: 8g
spark.driver.memory: 16g

3. Getting session disconnections accompanied with the following error:

EndOfStreamException: Unable to read additional data from server sessionid 0x2037ec6c23d4240, likely server has closed socket
A solution was to change default values for the driver result size to "unlimited":

spark.driver.maxResultSize: 1t
spark.task.maxDirectResultSize: 2047m

Note

Information on how to set Spark parameters during the session creation can be found here.