Upload of big CSV files in Elixir is always a tedious process as the file will consume the whole memory. The solution in elixir for this is “File. Stream”. It handles the upload effectively as it stores one line at a time in a memory. This article explains the steps to achieve the large CSV file uploads using “File. Stream”.

Our milestones

Below are the steps to be followed

  • Upload full CSV into the storage server
  • Process CSV
  • Monitor the status of the process

Upload full CSV into the storage server

Uploading files will be automatically stored in temporary locations by using managed uploaded file %Plug.Upload{}. But the file will be deleted after the request is completed. However, this process will take time when uploading a large file and hence this should not be stored in temporary locations. Alternate solution is to store the CSV in the file storage.

Process CSV

While storing the file, following statistics are required and should be stored

  • Uploaded file path
  • Uploaded file error file path – For error handling
  • Total rows – Number of rows in the CSV
  • Processed row(s) – beginning with zero
  • Failed row(s) – beginning with zero
  • Loaded row(s) – beginning with zero
  • Status – In the beginning, it is “New”

By using above mentioned details, CSV uploading process can be monitored considering the fact that the above details are stored in the process table. By using the unique id of the process, the status and other process details can be monitored. For initiating the process, any of the asynchronous queueing system (like redis, queue etc.,) can be used as it cannot be done in single request. As it is clear that big files will have a long processing time, holding the same request will result in timeout. By following the process table method explained above, after initiating the process, the process will start whenever the queue is free and will not incur any abrupt timeouts or stoppages. Below is the sample code:

"File.stream!("example.csv")
|> CSV.decode(headers: true)
|> Stream.filter(#Any filter process if needed)
|> Enum.reduce(, fn
{ok, row_object}, {process_acc, loaded_acc, failed_acc, error_acc} ->
processed_acc = processed_acc + 1
case some_data_validation do
:ok ->
#insert a single record into database
loaded_acc = loaded_acc + 1
#Update the process details
{prcessed_acc, loaded_acc, failed_acc, error_acc}
{:error, reason} ->
failed_acc = failed_acc + 1
#Update the process details
{processed_acc, loaded_acc, failed_acc, [error_acc | reason]}
end
end)"

To explain the steps mentioned in the above code:

  • File.stream!(“example.csv”) this will stream the file line by line
  • CSV.decode(headers: true) will decode CSV row to an object
  • Stream.filter(#Any filter process if needed)
  • Enum.reduce() – For updating process monitoring details, few accumulators can be used to store the required details and in the same process, validations are also updated
  • After completing the above mentioned process, an error file will be created by using error_acc so that errors can be debugged with the help of the error file.

Monitor the status of the process

During the processing, the process details are updated and by using process unique id, all parameters like records processed, records successfully loaded, records in error can be monitored. These parameters are quite useful to show the progression of the process in the application and can be enhanced with graphical views like progress bars, charts etc.