nxcals.api.extraction.data.builders.DataFrame.mapInArrow
- DataFrame.mapInArrow(func: ArrowMapIterFunction, schema: Union[StructType, str], barrier: bool = False) DataFrame
Maps an iterator of batches in the current
DataFrame
using a Python native function that takes and outputs a PyArrow’s RecordBatch, and returns the result as aDataFrame
.The function should take an iterator of pyarrow.RecordBatchs and return another iterator of pyarrow.RecordBatchs. All columns are passed together as an iterator of pyarrow.RecordBatchs to the function and the returned iterator of pyarrow.RecordBatchs are combined as a
DataFrame
. Each pyarrow.RecordBatch size can be controlled by spark.sql.execution.arrow.maxRecordsPerBatch. The size of the function’s input and output can be different.New in version 3.3.0.
- Parameters:
func (function) – a Python native function that takes an iterator of pyarrow.RecordBatchs, and outputs an iterator of pyarrow.RecordBatchs.
schema (
pyspark.sql.types.DataType
or str) – the return type of the func in PySpark. The value can be either apyspark.sql.types.DataType
object or a DDL-formatted type string.barrier (bool, optional, default True) –
Use barrier mode execution.
Examples
>>> import pyarrow >>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) >>> def filter_func(iterator): ... for batch in iterator: ... pdf = batch.to_pandas() ... yield pyarrow.RecordBatch.from_pandas(pdf[pdf.id == 1]) >>> df.mapInArrow(filter_func, df.schema).show() +---+---+ | id|age| +---+---+ | 1| 21| +---+---+
Set
barrier
toTrue
to force themapInArrow
stage running in the barrier mode, it ensures all Python workers in the stage will be launched concurrently.>>> df.mapInArrow(filter_func, df.schema, barrier=True).show() +---+---+ | id|age| +---+---+ | 1| 21| +---+---+
Notes
This API is unstable, and for developers.
See also
pyspark.sql.functions.pandas_udf
,pyspark.sql.DataFrame.mapInPandas