mautic / app /bundles /ChannelBundle /Model /MessageQueueModel.php
chrisbryan17's picture
Upload folder using huggingface_hub
d2897cd verified
<?php
namespace Mautic\ChannelBundle\Model;
use Doctrine\ORM\EntityManagerInterface;
use Mautic\ChannelBundle\ChannelEvents;
use Mautic\ChannelBundle\Entity\MessageQueue;
use Mautic\ChannelBundle\Event\MessageQueueBatchProcessEvent;
use Mautic\ChannelBundle\Event\MessageQueueEvent;
use Mautic\ChannelBundle\Event\MessageQueueProcessEvent;
use Mautic\CoreBundle\Helper\CoreParametersHelper;
use Mautic\CoreBundle\Helper\UserHelper;
use Mautic\CoreBundle\Model\FormModel;
use Mautic\CoreBundle\Security\Permissions\CorePermissions;
use Mautic\CoreBundle\Translation\Translator;
use Mautic\LeadBundle\Entity\Lead;
use Mautic\LeadBundle\Model\CompanyModel;
use Mautic\LeadBundle\Model\LeadModel;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;
use Symfony\Contracts\EventDispatcher\Event;
/**
* @extends FormModel<MessageQueue>
*/
class MessageQueueModel extends FormModel
{
/**
* @var string A default message reschedule interval
*/
public const DEFAULT_RESCHEDULE_INTERVAL = 'PT15M';
public function __construct(
protected LeadModel $leadModel,
protected CompanyModel $companyModel,
CoreParametersHelper $coreParametersHelper,
EntityManagerInterface $em,
CorePermissions $security,
EventDispatcherInterface $dispatcher,
UrlGeneratorInterface $router,
Translator $translator,
UserHelper $userHelper,
LoggerInterface $mauticLogger
) {
parent::__construct($em, $security, $dispatcher, $router, $translator, $userHelper, $mauticLogger, $coreParametersHelper);
}
/**
* @return \Mautic\ChannelBundle\Entity\MessageQueueRepository
*/
public function getRepository()
{
return $this->em->getRepository(MessageQueue::class);
}
/**
* @param int $attempts
* @param int $priority
* @param mixed $messageQueue
* @param string $statTableName
* @param string $statContactColumn
* @param string $statSentColumn
*/
public function processFrequencyRules(
array &$leads,
$channel,
$channelId,
$campaignEventId = null,
$attempts = 3,
$priority = MessageQueue::PRIORITY_NORMAL,
$messageQueue = null,
$statTableName = 'email_stats',
$statContactColumn = 'lead_id',
$statSentColumn = 'date_sent'
): array {
$leadIds = array_keys($leads);
$leadIds = array_combine($leadIds, $leadIds);
/** @var \Mautic\LeadBundle\Entity\FrequencyRuleRepository $frequencyRulesRepo */
$frequencyRulesRepo = $this->em->getRepository(\Mautic\LeadBundle\Entity\FrequencyRule::class);
$defaultFrequencyNumber = $this->coreParametersHelper->get($channel.'_frequency_number');
$defaultFrequencyTime = $this->coreParametersHelper->get($channel.'_frequency_time');
$dontSendTo = $frequencyRulesRepo->getAppliedFrequencyRules(
$channel,
$leadIds,
$defaultFrequencyNumber,
$defaultFrequencyTime,
$statTableName,
$statContactColumn,
$statSentColumn
);
$queuedContacts = [];
foreach ($dontSendTo as $frequencyRuleMet) {
// We only deal with date intervals here (no time intervals) so it's safe to use 'P'
$scheduleInterval = new \DateInterval('P1'.substr($frequencyRuleMet['frequency_time'], 0, 1));
if ($messageQueue && isset($messageQueue[$frequencyRuleMet['lead_id']])) {
$this->reschedule($messageQueue[$frequencyRuleMet['lead_id']], $scheduleInterval);
} else {
// Queue this message to be processed by frequency and priority
$this->queue(
[$leads[$frequencyRuleMet['lead_id']]],
$channel,
$channelId,
$scheduleInterval,
$attempts,
$priority,
$campaignEventId
);
}
$queuedContacts[$frequencyRuleMet['lead_id']] = $frequencyRuleMet['lead_id'];
unset($leads[$frequencyRuleMet['lead_id']]);
}
return $queuedContacts;
}
/**
* Adds messages to the queue.
*
* @param array $leads
* @param string $channel
* @param int $channelId
* @param int $maxAttempts
* @param int $priority
* @param int|null $campaignEventId
* @param array $options
*/
public function queue(
$leads,
$channel,
$channelId,
\DateInterval $scheduledInterval,
$maxAttempts = 1,
$priority = 1,
$campaignEventId = null,
$options = []
): bool {
$messageQueues = [];
$scheduledDate = (new \DateTime())->add($scheduledInterval);
foreach ($leads as $lead) {
$leadId = (is_array($lead)) ? $lead['id'] : $lead->getId();
if (!empty($this->getRepository()->findMessage($channel, $channelId, $leadId))) {
continue;
}
$messageQueue = new MessageQueue();
if ($campaignEventId) {
$messageQueue->setEvent($this->em->getReference(\Mautic\CampaignBundle\Entity\Event::class, $campaignEventId));
}
$messageQueue->setChannel($channel);
$messageQueue->setChannelId($channelId);
$messageQueue->setDatePublished(new \DateTime());
$messageQueue->setMaxAttempts($maxAttempts);
$messageQueue->setLead(
($lead instanceof Lead) ? $lead : $this->em->getReference(Lead::class, $leadId)
);
$messageQueue->setPriority($priority);
$messageQueue->setScheduledDate($scheduledDate);
$messageQueue->setOptions($options);
$messageQueues[] = $messageQueue;
}
if ($messageQueues) {
$this->saveEntities($messageQueues);
$messageQueueRepository = $this->getRepository();
$messageQueueRepository->detachEntities($messageQueues);
}
return true;
}
public function sendMessages($channel = null, $channelId = null): int
{
// Note when the process started for batch purposes
$processStarted = new \DateTime();
$limit = 50;
$counter = 0;
foreach ($this->getRepository()->getQueuedMessages($limit, $processStarted, $channel, $channelId) as $queue) {
$counter += $this->processMessageQueue($queue);
$event = $queue->getEvent();
$lead = $queue->getLead();
if ($event) {
$this->em->detach($event);
}
$this->em->detach($lead);
$this->em->detach($queue);
}
return $counter;
}
public function processMessageQueue($queue): int
{
if (!is_array($queue)) {
if (!$queue instanceof MessageQueue) {
throw new \InvalidArgumentException('$queue must be an instance of '.MessageQueue::class);
}
$queue = [$queue->getId() => $queue];
}
$counter = 0;
$contacts = [];
$byChannel = [];
// Lead entities will not have profile fields populated due to the custom field use - therefore to optimize resources,
// get a list of leads to fetch details all at once along with company details for dynamic email content, etc
/** @var MessageQueue $message */
foreach ($queue as $message) {
if ($message->getLead()) {
$contacts[$message->getId()] = $message->getLead()->getId();
}
}
if (!empty($contacts)) {
$contactData = $this->leadModel->getRepository()->getContacts($contacts);
foreach ($contacts as $messageId => $contactId) {
$queue[$messageId]->getLead()->setFields($contactData[$contactId]);
}
}
// Group queue by channel and channel ID - this make it possible for processing listeners to batch process such as
// sending emails in batches to 3rd party transactional services via HTTP APIs
foreach ($queue as $key => $message) {
if (MessageQueue::STATUS_SENT == $message->getStatus()) {
unset($queue[$key]);
continue;
}
$messageChannel = $message->getChannel();
$messageChannelId = $message->getChannelId();
if (!$messageChannelId) {
$messageChannelId = 0;
}
if (!isset($byChannel[$messageChannel])) {
$byChannel[$messageChannel] = [];
}
if (!isset($byChannel[$messageChannel][$messageChannelId])) {
$byChannel[$messageChannel][$messageChannelId] = [];
}
$byChannel[$messageChannel][$messageChannelId][] = $message;
}
// First try to batch process each channel
foreach ($byChannel as $messageChannel => $channelMessages) {
foreach ($channelMessages as $messageChannelId => $messages) {
$event = new MessageQueueBatchProcessEvent($messages, $messageChannel, $messageChannelId);
$ignore = null;
$this->dispatchEvent('process_batch_message_queue', $ignore, false, $event);
}
}
unset($byChannel);
// Now check to see if the message was processed by the listener and if not
// send it through a single process event listener
foreach ($queue as $message) {
if (!$message->isProcessed()) {
$event = new MessageQueueProcessEvent($message);
$this->dispatchEvent('process_message_queue', $message, false, $event);
}
if ($message->isSuccess()) {
++$counter;
$message->setSuccess();
$message->setLastAttempt(new \DateTime());
$message->setDateSent(new \DateTime());
$message->setStatus(MessageQueue::STATUS_SENT);
} elseif ($message->isFailed()) {
// Failure such as email delivery issue or something so retry in a short time
$this->reschedule($message, new \DateInterval(self::DEFAULT_RESCHEDULE_INTERVAL));
} // otherwise assume the listener did something such as rescheduling the message
}
// add listener
$this->saveEntities($queue);
return $counter;
}
/**
* @param bool $persist
*/
public function reschedule($message, \DateInterval $rescheduleInterval, $leadId = null, $channel = null, $channelId = null, $persist = false): void
{
if (!$message instanceof MessageQueue && $leadId && $channel && $channelId) {
$message = $this->getRepository()->findMessage($channel, $channelId, $leadId);
$persist = true;
}
if (!$message) {
return;
}
$message->setAttempts($message->getAttempts() + 1);
$message->setLastAttempt(new \DateTime());
$rescheduleTo = clone $message->getScheduledDate();
$rescheduleTo->add($rescheduleInterval);
$message->setScheduledDate($rescheduleTo);
$message->setStatus(MessageQueue::STATUS_RESCHEDULED);
if ($persist) {
$this->saveEntity($message);
}
// Mark as processed for listeners
$message->setProcessed();
}
/**
* @deprecated to be removed in 3.0; use reschedule method instead
*
* @param string $rescheduleInterval
* @param bool $persist
*/
public function rescheduleMessage($message, $rescheduleInterval = null, $leadId = null, $channel = null, $channelId = null, $persist = false): void
{
$rescheduleInterval = null == $rescheduleInterval ? self::DEFAULT_RESCHEDULE_INTERVAL : ('P'.$rescheduleInterval);
$this->reschedule($message, new \DateInterval($rescheduleInterval), $leadId, $channel, $channelId, $persist);
}
/**
* @param array $channelIds
*/
public function getQueuedChannelCount($channel, $channelIds = []): int
{
return $this->getRepository()->getQueuedChannelCount($channel, $channelIds);
}
/**
* @throws \Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException
*/
protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null): ?Event
{
switch ($action) {
case 'process_message_queue':
$name = ChannelEvents::PROCESS_MESSAGE_QUEUE;
break;
case 'process_batch_message_queue':
$name = ChannelEvents::PROCESS_MESSAGE_QUEUE_BATCH;
break;
case 'post_save':
$name = ChannelEvents::MESSAGE_QUEUED;
break;
default:
return null;
}
if ($this->dispatcher->hasListeners($name)) {
if (empty($event)) {
$event = new MessageQueueEvent($entity, $isNew);
$event->setEntityManager($this->em);
}
$this->dispatcher->dispatch($event, $name);
return $event;
} else {
return null;
}
}
}