setName(self::NAME) ->addOption( '--newer-into-older', null, InputOption::VALUE_NONE, 'By default, this command will merge older contacts and activity into the newer. Use this flag to reverse that behavior.' ) ->addOption( '--batch', null, InputOption::VALUE_REQUIRED, 'How many contact duplicates to process at once. Defaults to 100.', 100 ) ->addOption( '--processes', null, InputOption::VALUE_REQUIRED, 'The commands can run in multiple PHP processes. This option defines how many processes to run. Defaults to 1.', 1 ) ->setHelp( <<<'EOT' The %command.name% command will dedpulicate contacts based on unique identifier values. php %command.full_name% EOT ); } protected function execute(InputInterface $input, OutputInterface $output): int { $newerIntoOlder = (bool) $input->getOption('newer-into-older'); $batch = (int) $input->getOption('batch'); $processes = (int) $input->getOption('processes'); $uniqueFields = $this->contactDeduper->getUniqueFields('lead'); $duplicateCount = $this->contactDeduper->countDuplicatedContacts(array_keys($uniqueFields)); $stopwatch = new Stopwatch(); if (!$duplicateCount) { $output->writeln('No contacts to deduplicate.'); return Command::FAILURE; } $stopwatch->start('deduplicate'); $output->writeln('Deduplicating contacts based on unique identifiers: '.implode(', ', $uniqueFields)); $output->writeln("{$duplicateCount} contacts found to deduplicate"); $processQueue = new ProcessQueue($processes); $processCount = (int) ceil($duplicateCount / $batch); $output->writeln(''); $output->writeln("Finding duplicates and creating processes for deduplication. {$processCount} processes will be queued."); $contactIds = $this->contactDeduper->getDuplicateContactIds(array_keys($uniqueFields)); $contactIdChunks = array_chunk($contactIds, $batch); foreach ($contactIdChunks as $contactIdBatch) { $command = [ $this->params->get('kernel.project_dir').'/bin/console', DeduplicateIdsCommand::NAME, '--contact-ids', implode(',', $contactIdBatch), '-e', MAUTIC_ENV, ]; if ($newerIntoOlder) { $command[] = '--newer-into-older'; } $envParams = [ 'db_table_prefix' => MAUTIC_TABLE_PREFIX, 'contact_unique_identifiers_operator' => $this->params->get('mautic.contact_unique_identifiers_operator'), ]; $processQueue->enqueue(new Process($command, null, ['MAUTIC_CONFIG_PARAMETERS' => json_encode($envParams)])); } $output->writeln(''); $output->writeln("Starting to execute the {$processCount} processes for deduplication. {$processes} processes will be executed in parallel."); $progressBar = new ProgressBar($output, $processCount); $progressBar->setFormat('debug'); $progressBar->start(); $processQueue->refresh(); while ($processQueue->isProcessing()) { usleep(100); $processQueue->refresh(); $progressBar->setProgress($processQueue->getProcessedCount()); } $output->writeln(''); $output->writeln(''); $output->writeln('All processes have finished. The output of each process is below.'); foreach ($processQueue->getProcessed() as $process) { $output->writeln("{$process->getCommandLine()}"); if (0 === $process->getExitCode()) { $output->writeln("{$process->getOutput()}"); } else { $output->writeln("{$process->getErrorOutput()}"); } } $progressBar->finish(); $event = $stopwatch->stop('deduplicate'); $output->writeln(''); $output->writeln("Duration: {$event->getDuration()} ms, Memory: {$event->getMemory()} bytes"); return Command::SUCCESS; } protected static $defaultDescription = 'Merge contacts based on same unique identifiers'; }