This tutorial will cover a complete worked example of how to stream data in PySpark using the applyInPandasWithState function and foreachBatch. Spark structured streaming does not always come with the tools needed out-of-the-box and by using applyInPandasWithState and foreachBatch the streaming functionality can be customised. The example will use the scenario of streaming data from aircraft to analyse the aircrafts health. If any issues are found then the pilot and maintenance department can be immediately notified. The article will focus on how custom group states can be created and managed. The example will show how to update a custom state and update a delta table with every update to the state.
Article pages
ApplyInPandasWithState is an abstraction of the functions mapGroupsWithState and flatMapGroupsWithState that are only available in JAVA and SCALA. The applyInPandasWithState abstraction enables the functionality of mapGroupsWithState and flatMapGroupsWithState to be used in PySpark.
Download PySpark streaming example
You can download the two notebooks for this article and adapt them for your own purposes. The first PySpark notebook sets up a set of streaming CSV files. The second PySpark notebook is an example of how to implement applyInPandasWithState
Download PySpark applyInPandasWithState create streaming source >
Download PySpark structured streaming with applyInPandasWithState worked example >
PySpark structured streaming modes
The standard way of streaming data in PySpark is as follows. Here the readStream and writeStream functions will read a stream of CSV files in a folder and then write the output to a delta table.
# Define the path to the source data
source_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define the path to the output data
output_path = f"/Volumes/{my_catalog}/{my_schema}/output/"
# Read the CSV stream
df = (spark.readStream
.format("csv")
.option("inferSchema", True)
.option("header", True)
.load(source_path)
)
# Write the stream to a Delta table
query = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{output_path}/_checkpoint")
.start(output_path)
)
The stream of data can be written to the output in 3 modes depending on the desired behaviour.
- Complete mode will update the entire results table every time a new record is received. With millions of records, this process will become increasingly slower and slower with each new batch of data received. This is there for only appropriate for development and testing scenarios. This is the mode that the display() function uses when displaying streamed data.
- Append mode adds new records to the results table but does not update existing records. When aggregations are performed it will not update the results table until all the data has been received for an aggregation group. This can be an issue if updates of a running total are needed before all the data for the group has arrived.
- Update mode will update existing data but has a restriction on the data destination. In particular it cannot write to delta tables. This is a key issue in many scenarios as delta tables are a very common destination.
If data stream needs to:
- Append new records and not update existing records
- Group and aggregate data but only write to the output once all data for the group is received
- Update records to a compatible output format
Then the standard streaming modes work well. If it is needed to do something outside of this then an arbitrary state management solution may be needed. This article will demonstrate how implement an arbitrary stateful solution.
State Management
There are two states of streaming data. The first is where no knowledge is required about any previous or future record. Each record is self contained and this is referred to as stateless. The second is where operations need to be applied between a record and other records such as grouping and aggregating a set of records within a group.
To deal with the requirement to have knowledge of other records, information about them is stored in a state. Using applyInPandasWithState enables this state to be managed manually within a set framework. This enables sets of data about a group to be stored for AI algorithms or any other format of data that is needed for the particular use case.
The state for a group is stored as a tuple i.e. my_tuple = (value1, value2). It is possible to store any python object in here that you want that represents the combination of records. This could include an array or dictionary object.
Scenario
This example is based on a scenario where the requirement was to store all the data for a particular group in the state and then apply AI algorithms to the set of data for that group. The AI then made predictions about possible issues that were about to occur and applied business rules to identify issues that had occurred. This enabled corrective action to be immediately taken and pre-emptive action to be taken for predicted problems. The results of the AI was stored in the state and updated in a delta table as new records came in. Once the set of group data had concluded then the state was removed.
For the demo the situation will be adapted to data being streamed from aircraft transmitting ACARS (Aircraft Communications Addressing and Reporting System) information. This data set includes critical information such as engine vibrations and fuel flow rates.
The data for each flight (group) will be stored in an active state while the plane is in flight. The data points are continuously analysed every time new data is streamed. The data and results of the analysis is also continuously updated to the delta table. Once the plane lands the data held in the state is cleared and the final state including the analysis is stored in the delta table. An illustration of this can be seen below.
Requirements
To create a solution to the above scenario the following requirements are needed and similar to the real project from which this is based.:
- Update a delta table every time a new record is received
- Maintain the data points for a group so that the AI algorithm can be applied to it
- Maintain all the data points so that comparisons can be made each time a record is received
- Store the results of the AI in the State and update the Delta table
- Process 1 billion data points per day.
Now the background has been set lets move on and set up a set of streamed CSV files. This will emulate aircraft sensor data being received that can be streamed using the applyInPandasWithState function. If this is of less interest to you you can simply download the notebook at the top of the page and skip ahead to the applyInPandasWithState implementation.