How to generate PySpark streaming files

The demo is an example of how to set up a simulated streaming source of csv files. It is the second part in a 3 part series on streaming data with applyInPandasWithState. If you just want an example of how to create test stream files and have come directly to this page then you are in the correct place. If you also want to learn about structured streaming and the applyInPandasWithState function then you need to go back a step and read the first part of the article:

PySpark applyInPandasWithState introduction >

If you are just interested in the applyInPandasWithState function and have already read part one then just download the file stream generator and got to part 3 here: 

PySpark structured streaming applyInPandasWithState worked example > >

Article pages 

  1. PySpark applyInPandasWithState introduction >
  2. How to generate PySpark streaming files >
  3. PySpark structured streaming applyInPandasWithState worked example >

Download PySpark streaming example 

You can download the notebook for this example and adapt if for your own purposes. 

For this demo we create a file stream that can simulate the data received from the ARCAS aircraft systems. This will create a stream of csv files simulating the aircraft data that we can use in the next part of the article.

First, we import the required modules for the notebook. “Random” will be used to generate some random sensor data values, the time and date functions are used to create timestamps and then the SQL types are used in the data schema.

Imports

				
					import random
import time
from datetime import datetime

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
				
			

To run this notebook in your own environment you need to set these variables to your own values. This demo stores all the data in the unity catalog and volumes. You must have a unity catalog and volumes set up. Set the catalog name, schema name and volume name where you want to place a delta table and streamed csv files simulating the stream. You will also need to set your storge account location. In azure this would be an azure data lake storage gen 2 location.  You will need to create and define the folders  where the csv files should be initially dumped and then where the cleaned csv’s for the streaming process should be placed. You will need to create two folders in your volume named data_dump and data_stream.

Global variables

				
					catalog_name = "my-catalog"
schema_name = "demo-schema"
volume_name = "landing-zone"

# Change padsunitycatalogue to your storage account name.
# You can also sepcify a container name and directory location for the catalog e.g. : 'abfss://containername@<accountname.dfs.core.windows.net//directoryname'
storge_account = "abfss://my-catalog.dfs.core.windows.net" 

vol_flight_data_dump = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/data_dump/"
vol_flight_data_stream = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/data_stream/"
				
			

Create flight data schema

Here the schema of the data that is required for the csv file is defined. These are the columns that will be written to continuous csv files. In the final tutorial part, the  flight id will be used to identify all the readings that belong to a particular flight. The data set also has an aircraft id that will be re-used across multiple flights.  We create several sensor data readings such as vibration,  a timestamp for the reading and in indicator as to if the plane has now landed. This will be used to indicate when we no longer need to hold the data in the PySpark streaming state and can remove it.

				
					###############################
# Define schema
schema = StructType([
    StructField("flight_id", IntegerType(), True),
    StructField("aircraft_id", IntegerType(), True),
    StructField("left_engine_oil_pressure", IntegerType(), True),
    StructField("right_engine_oil_pressure", IntegerType(), True),
    StructField("left_engine_fuel_flow_rate", IntegerType(), True),
    StructField("right_engine_fuel_flow_rate", IntegerType(), True),
    StructField("left_engine_vibration", IntegerType(), True),
    StructField("right_engine_vibration", IntegerType(), True),
    StructField("date_time", TimestampType(), True),
    StructField("landed", IntegerType(), True)
])
				
			

The data for the CSV files is going to be generated by looping through a while loop. Next the variables to control that loop need to be configured. The file_counter is used to increment the numerator appended to the end of the csv file name. Each time the loop iterates it will be increased by 1.  The flight_id is used to group data points together into flights and the total_flight_data_points counter is used to control how many data points are created for each flight.

 

Loop variables

				
					###############################
# Loop vars

# Used for file names
file_counter = 1

# Increments set of flight ids
flight_id = 0

# Used to move to next set of flights every number of data points.
total_flight_data_points = 1 
				
			

As this is a test stream the loop run time is limited to 5 minutes by using the time functions.

				
					# Write stream files to volume for 5 minutes

# Run loop for 5 minutes 
duration = 300 
start_time = time.time()
				
			

While loop

  • In the while loop, list comprehension is used to create a set of data that can be written into a csv file following the schema previously defined. The if statement is used to decide if the flight has finished or not by setting the landed flag to 1 or 0.  Records for 10 different flights are created with the for loop: for i in range(10).
  • The list comprehension creates an array called data. We need to write that data to a csv file. This is done by creating a spark dataframe on line 41 with a set schema (defined above) and the data created in by the list comprehension.  This is then written to a single CSV file on line 44 using coalesce to ensure it is written to one file and not distributed to many files. 
  • The drawback of using PySpark to write a csv file is that it also creates other meta data. So between line 48 and 61 the csv file is copied to a clean folder location. There are other ways of writing csv files that avoid this problem. Pandas could also be used to write the csv file without the additional meta data.
  • Finally between line 61 and 72 iterations are carried out to keep track of the total number files written, the number of data points recorded for each flight and reset the flight id to the next set of 10 data points. This while loops waits 1 second between each loop to simulate data being received from an IoT source.
				
					while (time.time() - start_time) < duration:  

    if (total_flight_data_points == 5):
        # Create data set using list comprehension
        # and set fliht to landed 
        data = [
            (
                flight_id + i, 
                i, 
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                datetime.now(),
                1
            ) 
            for i in range(10)
        ]
    else:
        # Create data set using list comprehension
        # and set flight to not landed
        data = [
            (
                flight_id + i, 
                i, 
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                random.randint(1, 100),
                datetime.now(),
                0
            ) 
            for i in range(10)
        ]

    # create flight data stream data frame
    df = spark.createDataFrame(data, schema)

    # Create single CSV file in filestream folder
    df.coalesce(1).write.mode("overwrite").csv(vol_flight_data_dump, header=True)

    ############################
    # Copy CSV file and rename to new location
    file_list = dbutils.fs.ls(vol_flight_data_dump)

    # csv file names
    name = ""
    new_name = "flight_data_" + str(file_counter) + ".csv"

    # Get csv file name
    for file_name in file_list:
        if file_name.name.endswith('csv'):
            name = file_name.name
            break

    # Copy the file into the new location with controlled permissions using UC
    dbutils.fs.cp(vol_flight_data_dump + name, vol_flight_data_stream + new_name)

    # Iterate to next set of flights
    if (total_flight_data_points == 5):
        total_flight_data_points = 0
        flight_id = flight_id + 10

    # Increment total number of data points logged for flight
    total_flight_data_points = total_flight_data_points + 1

    # increment to next file number
    file_counter = file_counter + 1

    # Wait 1 second before creating next file
    time.sleep(1)
				
			

In the next part of this article the applyInPandasWithState will be demonstrated.

PySpark structured streaming applyInPandasWithState worked example >