Stream API

API Recipe: Upload data in parallel

You have a table containing one billion rows, and want to upload it to a Domo DataSet quickly. To do this, we divide the data in ten groups of 100 million rows, and then upload each part in parallel. Let’s walk through this process.

For use with Big Data

Parallel uploading is generally used for massive DataSets, and this guide effectively reviews the StreamĀ API. For small DataSets, please use the DataSet API.

Data Division and Part Size

In this example, we divide one billion rows into ten parts of 100 million rows. In reality, data parts should be at least 50mb in size. The row count for each part varies depending on the number of columns in the datasource, as well as the general length of cell contents. Experimentation with row count per part may be required for each datasource.

Parallel Upload Overview

Data Preparation (one billion row example)

  1. Query your database or datasource for the desired result set
  2. Convert the result set to a column/row format
  3. Partition the result set into groups of 100 million rows (or partition via multiple DB queries/offsets)
  4. Convert each group into header-less CSV files (In this example, ten CSV files would be generated)
  5. Gzip compress each CSV file (this significantly reduces upload time)

Parallel Upload via Stream Pipeline

  1. Create a Stream Execution, or upload job, on the desired Domo DataSet
  2. Asynchronously upload all ten gzip files (data parts) in parallel, via the Execution
  3. Ensure all ten parts have uploaded successfully, retry any failed parts
  4. When all parts have been uploaded, commit the Stream Execution to finalize the upload job

Parallel Upload Tutorial - Java SDK

The following tutorial features code snippets from a fully executable project, which can be downloaded here.

We begin assuming your data has already been divided into parts, into CSV files on disk.

1 - Create a Domo Dataset via the Upload Accelerator/Stream API, or use an existing DataSet that was created using the Upload Accelerator/Stream API. For example:

// Define a Domo DataSet Schema
CreateDataSetRequest dataSetRequest = new CreateDataSetRequest();
dataSetRequest.setName("Leonhard Euler Party");
dataSetRequest.setDescription("Mathematician Guest List");
List<Column> columns = new ArrayList<>();
columns.add(new Column(STRING, "Friend"));
columns.add(new Column(STRING, "Attending"));
dataSetRequest.setSchema(new Schema(columns));

// Create the Domo DataSet using the Schema, via the Stream Pipeline API
StreamRequest streamRequest = new StreamRequest();
streamRequest.setDataSet(dataSetRequest);
streamRequest.setUpdateMethod(UpdateMethod.REPLACE);
Stream stream = streamClient.create(streamRequest);
System.out.println("Created:" + stream);

2 - Retrieve the Stream ID of the newly created DataSet Stream Pipeline, or the ID of an existing DataSet Stream Pipeline

// From a newly created/returned Stream Pipeline
long streamId = stream.getId();

3 - Compress the CSV files within a temp folder, for convenience

//Compress the incoming csv files in a temp folder
List<File> csvFiles = loadExampleFiles();
File tempFolder = new File(csvFiles.get(0).getParent() + "/temp/");
if (!tempFolder.exists()){
    tempFolder.mkdir();
}
List<File> compressedCsvFiles = toGzipFilesUTF8(csvFiles, tempFolder.getPath() + "/");

// Gzip functions for convenience. UTF-8 encoding is critical.
public List<File> toGzipFilesUTF8( List<File> sourceFiles, String path){
    List<File> files = new ArrayList<>();
    for (File sourceFile : sourceFiles) {
        String zipFileName = sourceFile.getName().replace(".csv", ".zip");
        files.add(toGzipFileUTF8(sourceFile, path + zipFileName));
    }
    return files;
}

public File toGzipFileUTF8( File csvFile, String zipFilePath){
    File outputFile = new File(zipFilePath);
    try {
        GZIPOutputStream gzos = new GZIPOutputStream(new FileOutputStream(outputFile));
        BufferedReader reader = new BufferedReader(new FileReader(csvFile));

        String currentLine;
        while ((currentLine = reader.readLine()) != null){
            currentLine += System.lineSeparator();
            // Specifying UTF-8 encoding is critical; getBytes() uses ISO-8859-1 by default
            gzos.write(currentLine.getBytes("UTF-8")); 
        }

        gzos.flush();
        gzos.finish();
        gzos.close();

    }
    catch(IOException e) {
        logger.error("Error compressing a string to gzip", e);
    }

    return outputFile;
}

4 - Create a Stream Pipeline Execution (Upload job declaration)

// Create an Execution on the given Stream Pipeline
Execution execution = this.streamClient.createExecution(streamId);

5 - Encapsulate each part upload in Java Runnable task, for use with a Java ExecutorService

// Create the asynchronous executor service and task collection
ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<Object>> uploadTasks = Collections.synchronizedList(new ArrayList<>());

// For each data part (csv gzip file), create a runnable upload task
long partNum = 1;
for (File compressedCsvFile : compressedCsvFiles){
    long myPartNum = partNum;
    // "uploadDataPart()" accepts csv strings, csv files, and compressed csv files
    Runnable partUpload = () -> this.streamClient.uploadDataPart(streamId, execution.getId(), myPartNum, compressedCsvFile); 
    uploadTasks.add(Executors.callable(partUpload));
    partNum++;
}

6 - Asynchronously execute the uploads via the Java ExecutorService

// Asynchronously execute all uploading tasks
try {
    executorService.invokeAll(uploadTasks);
}
catch (Exception e){
    logger.error("Error uploading all data parts", e);
}

6 - Commit the Stream Pipeline Execution (Finalize the upload job)

// Commit the Stream Execution to finalize the multi-part upload
this.streamClient.commitExecution(streamId, execution.getId());

7 - Delete the gzip temp folder

// Delete the temp folder
if (tempFolder.exists()) {
    try {
        Files.delete(tempFolder.toPath());
    }
    catch (Exception e){
        logger.error("Error deleting the compressed csv temp folder");
    }
}

In summary, divide your data into compressed CSV files, create a Stream Pipeline Execution, upload all parts asynchronously, and then commit the Execution to finalize the upload job.