syncDateHelper->setSyncDateTimes($this->inputOptionsDAO->getStartDateTime(), $this->inputOptionsDAO->getEndDateTime()); $this->integrationSyncProcess->setupSync($this->inputOptionsDAO, $this->mappingManualDAO, $this->integrationSyncDataExchange); $this->mauticSyncProcess->setupSync($this->inputOptionsDAO, $this->mappingManualDAO, $this->internalSyncDataExchange); if ($this->inputOptionsDAO->pullIsEnabled()) { $this->executeIntegrationSync(); } if ($this->inputOptionsDAO->pushIsEnabled()) { $this->syncDateHelper->setInternalSyncStartDateTime(); $this->executeInternalSync(); } // Tell listeners sync is done $this->eventDispatcher->dispatch( new SyncEvent($this->inputOptionsDAO), IntegrationEvents::INTEGRATION_POST_EXECUTE ); } private function executeIntegrationSync(): void { $this->syncIteration = 1; while (true) { DebugLogger::log( $this->mappingManualDAO->getIntegration(), sprintf('Integration to Mautic; syncing iteration %s', $this->syncIteration), self::class.':'.__FUNCTION__ ); $syncReport = $this->integrationSyncProcess->getSyncReport($this->syncIteration); if (!$syncReport->shouldSync()) { DebugLogger::log( $this->mappingManualDAO->getIntegration(), 'Integration to Mautic; no objects were mapped to be synced', self::class.':'.__FUNCTION__ ); break; } // Update the mappings in case objects have been converted such as Lead -> Contact $this->mappingHelper->remapIntegrationObjects($syncReport->getRemappedObjects()); // Maps relations, synchronizes missing objects if necessary $this->manageRelations($syncReport); // Convert the integrations' report into an "order" or instructions for Mautic $syncOrder = $this->mauticSyncProcess->getSyncOrder($syncReport); if (!$syncOrder->shouldSync()) { DebugLogger::log( $this->mappingManualDAO->getIntegration(), 'Integration to Mautic; no object changes were recorded possible due to field direction configurations', self::class.':'.__FUNCTION__ ); break; } DebugLogger::log( $this->mappingManualDAO->getIntegration(), sprintf( 'Integration to Mautic; syncing %d total objects', $syncOrder->getObjectCount() ), self::class.':'.__FUNCTION__ ); // Execute the sync instructions $objectMappings = $this->internalSyncDataExchange->executeSyncOrder($syncOrder); // Dispatch an event to allow subscribers to take action after this batch of objects has been synced to Mautic $orderResults = $this->getOrderResultsForIntegrationSync($syncOrder, $objectMappings); $this->eventDispatcher->dispatch( new CompletedSyncIterationEvent($orderResults, $this->syncIteration, $this->inputOptionsDAO, $this->mappingManualDAO), IntegrationEvents::INTEGRATION_BATCH_SYNC_COMPLETED_INTEGRATION_TO_MAUTIC ); unset($orderResults); if ($this->shouldStopIntegrationSync()) { break; } // Fetch the next iteration/batch ++$this->syncIteration; } } private function executeInternalSync(): void { $this->syncIteration = 1; while (true) { DebugLogger::log( $this->mappingManualDAO->getIntegration(), sprintf('Mautic to integration; syncing iteration %s', $this->syncIteration), self::class.':'.__FUNCTION__ ); $syncReport = $this->mauticSyncProcess->getSyncReport($this->syncIteration); if (!$syncReport->shouldSync()) { DebugLogger::log( $this->mappingManualDAO->getIntegration(), 'Mautic to integration; no objects were mapped to be synced', self::class.':'.__FUNCTION__ ); break; } // Convert the internal report into an "order" or instructions for the integration $syncOrder = $this->integrationSyncProcess->getSyncOrder($syncReport); if (!$syncOrder->shouldSync()) { DebugLogger::log( $this->mappingManualDAO->getIntegration(), 'Mautic to integration; no object changes were recorded possible due to field direction configurations', self::class.':'.__FUNCTION__ ); // Finalize notifications such as injecting user notifications $this->notifier->finalizeNotifications(); break; } DebugLogger::log( $this->mappingManualDAO->getIntegration(), sprintf( 'Mautic to integration; syncing %d total objects', $syncOrder->getObjectCount() ), self::class.':'.__FUNCTION__ ); // Execute the sync instructions $this->integrationSyncDataExchange->executeSyncOrder($syncOrder); // Save mappings and cleanup $this->finalizeSync($syncOrder); // Dispatch an event to allow subscribers to take action after this batch of objects has been synced to the integration $orderResults = $this->getOrderResultsForInternalSync($syncOrder); $this->eventDispatcher->dispatch( new CompletedSyncIterationEvent($orderResults, $this->syncIteration, $this->inputOptionsDAO, $this->mappingManualDAO), IntegrationEvents::INTEGRATION_BATCH_SYNC_COMPLETED_MAUTIC_TO_INTEGRATION ); unset($orderResults); // Fetch the next iteration/batch ++$this->syncIteration; } } private function manageRelations(ReportDAO $syncReport): void { // Map relations $this->relationsHelper->processRelations($this->mappingManualDAO, $syncReport); // Relation objects we need to synchronize $objectsToSynchronize = $this->relationsHelper->getObjectsToSynchronize(); if (!empty($objectsToSynchronize)) { $this->synchronizeMissingObjects($objectsToSynchronize, $syncReport); } } private function synchronizeMissingObjects(array $objectsToSynchronize, ReportDAO $syncReport): void { $inputOptions = $this->getInputOptionsForObjects($objectsToSynchronize); // We need to synchronize missing relation ids $this->processParallelSync($inputOptions); // Now we can map relations for objects we have just synchronised $this->relationsHelper->processRelations($this->mappingManualDAO, $syncReport); } /** * @throws \Mautic\IntegrationsBundle\Exception\InvalidValueException */ private function getInputOptionsForObjects(array $objectsToSynchronize): InputOptionsDAO { $mauticObjectIds = new ObjectIdsDAO(); foreach ($objectsToSynchronize as $object) { $mauticObjectIds->addObjectId($object->getObject(), $object->getObjectId()); } $integration = $this->mappingManualDAO->getIntegration(); return new InputOptionsDAO([ 'integration' => $integration, 'integration-object-id' => $mauticObjectIds, ]); } /** * @throws IntegrationNotFoundException */ private function processParallelSync($inputOptions): void { $currentSyncProcess = clone $this->integrationSyncProcess; $this->syncService->processIntegrationSync($inputOptions); // We need to bring back current $inputOptions which were overwritten by new sync $this->integrationSyncProcess = $currentSyncProcess; } private function shouldStopIntegrationSync(): bool { // We don't want to iterate sync for specific ids return null !== $this->inputOptionsDAO->getIntegrationObjectIds(); } /** * @throws IntegrationNotFoundException * @throws HandlerNotSupportedException */ private function finalizeSync(OrderDAO $syncOrder): void { // Save the mappings between Mautic objects and the integration's objects $this->mappingHelper->saveObjectMappings($syncOrder->getObjectMappings()); // Remap integration objects to Mautic objects if applicable $this->mappingHelper->remapIntegrationObjects($syncOrder->getRemappedObjects()); // Update last sync dates on existing object mappings $this->mappingHelper->updateObjectMappings($syncOrder->getUpdatedObjectMappings()); // Tell sync that these objects have been deleted and not to continue re-syncing them $this->mappingHelper->markAsDeleted($syncOrder->getDeletedObjects()); // Inject notifications $this->notifier->noteMauticSyncIssue($syncOrder->getNotifications()); // Cleanup field tracking for successfully synced objects $this->internalSyncDataExchange->cleanupProcessedObjects($syncOrder->getSuccessfullySyncedObjects()); } private function getOrderResultsForIntegrationSync(OrderDAO $syncOrder, ObjectMappingsDAO $objectMappings): OrderResultsDAO { // New objects were processed by OrderExecutioner $newObjectMappings = $objectMappings->getNewMappings(); // Updated objects were processed by OrderExecutioner $updatedObjectMappings = $objectMappings->getUpdatedMappings(); // Remapped objects $remappedObjects = $syncOrder->getRemappedObjects(); // Deleted objects $deletedObjects = $syncOrder->getDeletedObjects(); return new OrderResultsDAO($newObjectMappings, $updatedObjectMappings, $remappedObjects, $deletedObjects); } private function getOrderResultsForInternalSync(OrderDAO $syncOrder): OrderResultsDAO { // New object mappings $newObjectMappings = $syncOrder->getObjectMappings(); // Updated object mappings $updatedObjectMappings = []; foreach ($syncOrder->getUpdatedObjectMappings() as $updatedObjectMapping) { if (!$updatedObjectMapping->getObjectMapping()) { continue; } $updatedObjectMappings[] = $updatedObjectMapping->getObjectMapping(); } // Remapped objects $remappedObjects = $syncOrder->getRemappedObjects(); // Deleted objects $deletedObjects = $syncOrder->getDeletedObjects(); return new OrderResultsDAO($newObjectMappings, $updatedObjectMappings, $remappedObjects, $deletedObjects); } }