Break down datasets to avoid out of memory errors in migrations

By Kevin, October 21st, 2019

Having worked on a project this year that makes extensive use of Migrate in Drupal 8, there are some considerations to have when running them on a remote system.

In our case, we have a half dozen ongoing migrations that run every night. These are JSON files that are delivered from a remote system every night. The processing to generate output on the remote system is too time intensive, so we are not fetching via the HTTP capability of Migrate Plus.

A few of these files have records numbering over 25,000. Not a very big number. However, due to certain constraints on the Pantheon platform, we were encountering out of memory errors when processing larger sets, especially when refreshing established data with the `--update` argument. The Entity API is quite powerful, but also rough on the system resources when performing many operations as we do in a migration.

The first thought would be to process the data in smaller sets, and batching that out. Currently, it does not seem that the migrate:import command takes a filename argument, or has a way to batch JSON data into separate processes. Fortunately, with a little thought, we can get to a point rather quickly and set ourselves up for success.

The Shell Script

First, we need the ability to take a JSON file, and break it down into many smaller files while keeping the expected structure intact (so the data selector in the defined migration files does not break). Second, we need a migrate command that ingests data, but lets us specify a filename argument as our source.

Querying and parsing JSON can be done by leveraging the robust and powerful JQ tool. We essentially want to walk through a range query, and output that set to a new JSON file. By doing this, we can take smaller sets and generate new JSON files as our source data for the migration. You could take many roads to get there. In our case we stuck to a shell script for compatibility with team development environments and the server. JQ helps make short work of this, leaving us to implement the loop:

#!/usr/bin/env bash

set +ev

if [[ -z "$1" ]]; then
  echo 'Error: Please pass a source filename as the first argument.' >&2
  exit 1
fi

FILENAME=$1
CHUNKSIZE=$2
OUTPUT=${FILENAME//.json/}

if [[ -z "$2" ]]; then
  CHUNKSIZE=1000
fi

if ! [ -x "$(command -v jq)" ]; then
  echo 'Error: jq is not installed. You can obtain it from https://stedolan.github.io/jq/download.' >&2
  exit 1
fi

TOTAL=$(jq -c '.data | length' $FILENAME)

echo "$TOTAL records found in $FILENAME. Proceeding with chunking."

i=0
INDEX=1

if [ $TOTAL -gt 0 ]; then
  while [ $i -lt $TOTAL ]
  do
    END=$(( $i + $CHUNKSIZE ))
    RECORDS=$(jq -c ".data[$i:$END]" $FILENAME)
    SET="{\"data\":$RECORDS}"
    echo $SET > "$OUTPUT-$INDEX.json"
    echo "Writing records $i to $END into $OUTPUT-$INDEX.json"
    i=$END
    INDEX=$(( INDEX + 1 ))
  done
fi

set -v

We added a few arguments to take a filename (the source file) and chunk size. If the latter is not provided, then we default to 1000 per file.

As the loop executes, a slice of data is taken and wrapped with additional JSON to preserve our data selector path in our already defined migration files. It is then output to the same directory.

This can be made into a more formal command in the future, as well as put constraints around source and destination directory, but this gets us started.

Custom Migrate Command

Next, we need a migrate command that supports a filename argument so we can process a file, and call it in a new process when complete so we can free up resources. The command is more or less a clone of the existing migrate:import command, except it has a new argument that modifies the way the migration is executed when a filename is provided.

namespace Drupal\mymodule\Commands;

use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate_tools\Commands\MigrateToolsCommands;
use Drupal\migrate_tools\MigrateExecutable;
use Drupal\migrate_tools\MigrateTools;

/**
 * Class MigrationCommands
 *
 * @package Drupal\mymodule\Commands
 */
class MigrateCommands extends MigrateToolsCommands {

  /**
   * Perform a migration processes in a batch process.
   *
   * @param string $migration_names
   *   ID of migration to import.
   * @param array $options
   *   Additional options for the command.
   *
   * @command migrate:batch
   *
   * @default $options []
   *
   * @usage migrate:batch --all
   *   Perform all migrations
   * @usage migrate:batch working_paper --filename=private://migrations/working-paper-1.json
   *   Import all migrations in working_paper using a specific file
   *
   * @validate-module-enabled migrate_tools
   *
   * @aliases mbat, migrate-batch
   *
   * @throws \Exception
   *   If there are not enough parameters to the command.
   */
  public function batch($migration_names = '', array $options = [
    'all' => FALSE,
    'group' => self::REQ,
    'tag' => self::REQ,
    'limit' => self::REQ,
    'feedback' => self::REQ,
    'filename' => self::REQ,
    'idlist' => self::REQ,
    'idlist-delimiter' => ':',
    'update' => FALSE,
    'force' => FALSE,
    'execute-dependencies' => FALSE,
  ]) {
    $options += [
      'all' => NULL,
      'group' => NULL,
      'tag' => NULL,
      'limit' => NULL,
      'feedback' => NULL,
      'filename' => NULL,
      'idlist' => NULL,
      'idlist-delimiter' => ':',
      'update' => NULL,
      'force' => NULL,
      'execute-dependencies' => NULL,
    ];
    $group_names = $options['group'];
    $tag_names = $options['tag'];
    $all = $options['all'];
    $additional_options = [];
    if (!$all && !$group_names && !$migration_names && !$tag_names) {
      throw new \Exception(dt('You must specify --all, --group, --tag or one or more migration names separated by commas'));
    }

    $possible_options = [
      'limit',
      'feedback',
      'filename',
      'idlist',
      'idlist-delimiter',
      'update',
      'force',
      'execute-dependencies',
    ];
    foreach ($possible_options as $option) {
      if ($options[$option]) {
        $additional_options[$option] = $options[$option];
      }
    }

    $migrations = $this->migrationsList($migration_names, $options);
    if (empty($migrations)) {
      $this->logger->error(dt('No migrations found.'));
    }

    // Take it one group at a time, importing the migrations within each group.
    foreach ($migrations as $group_id => $migration_list) {
      array_walk(
        $migration_list,
        [$this, 'executeMigration'],
        $additional_options
      );
    }
  }

  /**
   * Executes a single migration.
   *
   * If the --execute-dependencies option was given,
   * the migration's dependencies will also be executed first.
   *
   * @param \Drupal\migrate\Plugin\MigrationInterface $migration
   *   The migration to execute.
   * @param string $migration_id
   *   The migration ID (not used, just an artifact of array_walk()).
   * @param array $options
   *   Additional options of the command.
   *
   * @default $options []
   *
   * @throws \Exception
   *   If some migrations failed during execution.
   */
  protected function executeMigration(MigrationInterface $migration, $migration_id, array $options = []) {
    // Keep track of all migrations run during this command so the same
    // migration is not run multiple times.
    static $executed_migrations = [];

    if (isset($options['execute-dependencies'])) {
      $required_migrations = $migration->get('requirements');
      $required_migrations = array_filter($required_migrations, function ($value) use ($executed_migrations) {
        return !isset($executed_migrations[$value]);
      });

      if (!empty($required_migrations)) {
        $manager = $this->migrationPluginManager;
        $required_migrations = $manager->createInstances($required_migrations);
        $dependency_options = array_merge($options, ['is_dependency' => TRUE]);
        array_walk($required_migrations, [$this, __FUNCTION__], $dependency_options);
        $executed_migrations += $required_migrations;
      }
    }
    if (!empty($options['force'])) {
      $migration->set('requirements', []);
    }
    if (!empty($options['update'])) {
      if (empty($options['idlist'])) {
        $migration->getIdMap()->prepareUpdate();
      }
      else {
        $source_id_values_list = MigrateTools::buildIdList($options);
        $keys = array_keys($migration->getSourcePlugin()->getIds());
        foreach ($source_id_values_list as $source_id_values) {
          $migration->getIdMap()->setUpdate(array_combine($keys, $source_id_values));
        }
      }
    }

    if (!empty($options['filename'])) {
      $source = $migration->getSourceConfiguration();
      $source['urls'] = [$options['filename']];
      $migration->set('source', $source);
    }

    $executable = new MigrateExecutable($migration, $this->getMigrateMessage(), $options);
    // drush_op() provides --simulate support.
    drush_op([$executable, 'import']);
    $executed_migrations += [$migration_id => $migration_id];
    if ($count = $executable->getFailedCount()) {
      // Nudge Drush to use a non-zero exit code.
      throw new \Exception(
        dt(
          '!name Migration - !count failed.',
          ['!name' => $migration_id, '!count' => $count]
        )
      );
    }
  }
}

This adds a new command - migrate:batch. If we pass it a file path in the filename argument, it will overwrite the definition to use that as the source file, instead of what is mapped in the migration yml file. If no file is provided, the migration just executes as it normally would.

Now we have a way to call migrate with smaller files to try to fit inside of the constraints on the server when processing a lot of data. The last task here would be creating a scheduled task that loops over these new files and calls migrate:batch against them sequentially with Drush. Since the number of files generated can vary, it would be pertinent to go back and enhance the shell script to drop the files to a specified location per migration.