chrisbryan17's picture
Upload folder using huggingface_hub
d2897cd verified
<?php
namespace Mautic\LeadBundle\Model;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\ORMException;
use Mautic\CoreBundle\Helper\Chart\ChartQuery;
use Mautic\CoreBundle\Helper\Chart\LineChart;
use Mautic\CoreBundle\Helper\CoreParametersHelper;
use Mautic\CoreBundle\Helper\DateTimeHelper;
use Mautic\CoreBundle\Helper\InputHelper;
use Mautic\CoreBundle\Helper\PathsHelper;
use Mautic\CoreBundle\Helper\UserHelper;
use Mautic\CoreBundle\Model\FormModel;
use Mautic\CoreBundle\Model\NotificationModel;
use Mautic\CoreBundle\ProcessSignal\ProcessSignalService;
use Mautic\CoreBundle\Security\Permissions\CorePermissions;
use Mautic\CoreBundle\Translation\Translator;
use Mautic\LeadBundle\Entity\Company;
use Mautic\LeadBundle\Entity\Import;
use Mautic\LeadBundle\Entity\ImportRepository;
use Mautic\LeadBundle\Entity\LeadEventLog;
use Mautic\LeadBundle\Entity\LeadEventLogRepository;
use Mautic\LeadBundle\Event\ImportEvent;
use Mautic\LeadBundle\Event\ImportProcessEvent;
use Mautic\LeadBundle\Exception\ImportDelayedException;
use Mautic\LeadBundle\Exception\ImportFailedException;
use Mautic\LeadBundle\Helper\Progress;
use Mautic\LeadBundle\LeadEvents;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;
use Symfony\Contracts\EventDispatcher\Event;
/**
* @extends FormModel<Import>
*/
class ImportModel extends FormModel
{
protected LeadEventLogRepository $leadEventLogRepo;
public function __construct(
protected PathsHelper $pathsHelper,
protected LeadModel $leadModel,
protected NotificationModel $notificationModel,
protected CoreParametersHelper $config,
protected CompanyModel $companyModel,
EntityManagerInterface $em,
CorePermissions $security,
EventDispatcherInterface $dispatcher,
UrlGeneratorInterface $router,
Translator $translator,
UserHelper $userHelper,
LoggerInterface $mauticLogger,
private ProcessSignalService $processSignalService
) {
$this->leadEventLogRepo = $leadModel->getEventLogRepository();
parent::__construct($em, $security, $dispatcher, $router, $translator, $userHelper, $mauticLogger, $config);
}
/**
* Returns the Import entity which should be processed next.
*
* @return Import|null
*/
public function getImportToProcess()
{
$result = $this->getRepository()->getImportsWithStatuses([Import::QUEUED, Import::DELAYED], 1);
if (isset($result[0]) && $result[0] instanceof Import) {
return $result[0];
}
return null;
}
/**
* Compares current number of imports in progress with the limit from the configuration.
*/
public function checkParallelImportLimit(): bool
{
$parallelImportLimit = $this->getParallelImportLimit();
$importsInProgress = $this->getRepository()->countImportsInProgress();
return !($importsInProgress >= $parallelImportLimit);
}
/**
* Returns parallel import limit from the configuration.
*
* @param int $default
*
* @return int
*/
public function getParallelImportLimit($default = 1)
{
return $this->config->get('parallel_import_limit', $default);
}
/**
* Generates a HTML link to the import detail.
*/
public function generateLink(Import $import): string
{
return '<a href="'.$this->router->generate(
'mautic_import_action',
['objectAction' => 'view', 'object' => 'lead', 'objectId' => $import->getId()]
).'" data-toggle="ajax">'.$import->getOriginalFile().' ('.$import->getId().')</a>';
}
/**
* Check if there are some IN_PROGRESS imports which got stuck for a while.
* Set those as failed.
*/
public function setGhostImportsAsFailed()
{
$ghostDelay = 2;
$imports = $this->getRepository()->getGhostImports($ghostDelay, 5);
if (empty($imports)) {
return null;
}
foreach ($imports as $import) {
$import->setStatus($import::FAILED)
->setStatusInfo($this->translator->trans('mautic.lead.import.ghost.limit.hit', ['%limit%' => $ghostDelay]))
->removeFile();
if ($import->getCreatedBy()) {
$this->notificationModel->addNotification(
$this->translator->trans(
'mautic.lead.import.result.info',
['%import%' => $this->generateLink($import)]
),
'info',
false,
$this->translator->trans('mautic.lead.import.failed'),
'ri-download-line',
null,
$this->em->getReference(\Mautic\UserBundle\Entity\User::class, $import->getCreatedBy())
);
}
}
$this->saveEntities($imports);
}
/**
* Start import. This is meant for the CLI command since it will import
* the whole file at once.
*
* @param int $limit Number of records to import before delaying the import. 0 will import all
*
* @throws ImportFailedException
* @throws ImportDelayedException
*/
public function beginImport(Import $import, Progress $progress, $limit = 0): void
{
$this->setGhostImportsAsFailed();
if (!$import) {
$msg = 'import is empty, closing the import process';
$this->logDebug($msg, $import);
throw new ImportFailedException($msg);
}
if (!$import->canProceed()) {
$this->saveEntity($import);
$msg = 'import cannot be processed because '.$import->getStatusInfo();
$this->logDebug($msg, $import);
throw new ImportFailedException($msg);
}
if (!$this->checkParallelImportLimit()) {
$info = $this->translator->trans(
'mautic.lead.import.parallel.limit.hit',
['%limit%' => $this->getParallelImportLimit()]
);
$import->setStatus($import::DELAYED)->setStatusInfo($info);
$this->saveEntity($import);
$msg = 'import is delayed because parrallel limit was hit. '.$import->getStatusInfo();
$this->logDebug($msg, $import);
throw new ImportDelayedException($msg);
}
$processed = $import->getProcessedRows();
$total = $import->getLineCount();
$pending = $total - $processed;
if ($limit && $limit < $pending) {
$processed = 0;
$total = $limit;
}
$progress->setTotal($total);
$progress->setDone($processed);
$import->start();
// Save the start changes so the user could see it
$this->saveEntity($import);
$this->logDebug('The background import is about to start', $import);
try {
if (!$this->process($import, $progress, $limit)) {
throw new ImportFailedException($import->getStatusInfo());
}
} catch (ORMException $e) {
// The EntityManager is probably closed. The entity cannot be saved.
$info = $this->translator->trans(
'mautic.lead.import.database.exception',
['%message%' => $e->getMessage()]
);
$import->setStatus($import::DELAYED)->setStatusInfo($info);
throw new ImportFailedException('Database had been overloaded');
}
$import->end();
$this->logDebug('The background import has ended', $import);
// Save the end changes so the user could see it
$this->saveEntity($import);
if ($import->getCreatedBy()) {
$this->notificationModel->addNotification(
$this->translator->trans(
'mautic.lead.import.result.info',
['%import%' => $this->generateLink($import)]
),
'info',
false,
$this->translator->trans('mautic.lead.import.completed'),
'ri-download-line',
null,
$this->em->getReference(\Mautic\UserBundle\Entity\User::class, $import->getCreatedBy())
);
}
}
/**
* Import the CSV file from configuration in the $import entity.
*
* @param int $limit Number of records to import before delaying the import
*/
public function process(Import $import, Progress $progress, $limit = 0): bool
{
try {
$file = new \SplFileObject($import->getFilePath());
} catch (\Exception $e) {
$import->setStatusInfo('SplFileObject cannot read the file. '.$e->getMessage());
$import->setStatus(Import::FAILED);
$this->logDebug('import cannot be processed because '.$import->getStatusInfo(), $import);
return false;
}
$lastImportedLine = $import->getLastLineImported();
$headers = $import->getHeaders();
$headerCount = count($headers);
$config = $import->getParserConfig();
$counter = 0;
$file->seek($lastImportedLine);
$lineNumber = $lastImportedLine + 1;
$this->logDebug('The import is starting on line '.$lineNumber, $import);
$batchSize = $config['batchlimit'];
// Convert to field names
array_walk($headers, function (&$val): void {
$val = strtolower(InputHelper::alphanum($val, false, '_'));
});
while ($batchSize && !$file->eof()) {
$string = $file->current();
$file->next();
$data = str_getcsv($string, $config['delimiter'], $config['enclosure'], $config['escape']);
$import->setLastLineImported($lineNumber);
// Ignore the header row
if (1 === $lineNumber) {
++$lineNumber;
continue;
}
// Ensure the progress is changing
++$lineNumber;
--$batchSize;
$progress->increase();
$errorMessage = null;
$eventLog = $this->initEventLog($import, $lineNumber);
if ($this->isEmptyCsvRow($data)) {
$errorMessage = 'mautic.lead.import.error.line_empty';
}
if ($this->hasMoreValuesThanColumns($data, $headerCount)) {
$errorMessage = 'mautic.lead.import.error.header_mismatch';
}
if (!$errorMessage) {
$data = $this->trimArrayValues($data);
if (!array_filter($data)) {
continue;
}
$data = array_combine($headers, $data);
try {
$event = new ImportProcessEvent($import, $eventLog, $data);
$this->dispatcher->dispatch($event, LeadEvents::IMPORT_ON_PROCESS);
if ($event->wasMerged()) {
$this->logDebug('Entity on line '.$lineNumber.' has been updated', $import);
$import->increaseUpdatedCount();
} else {
$this->logDebug('Entity on line '.$lineNumber.' has been created', $import);
$import->increaseInsertedCount();
}
} catch (\Exception $e) {
// Email validation likely failed
$errorMessage = $e->getMessage();
}
}
if ($errorMessage) {
// Log the error first
$import->increaseIgnoredCount();
$this->logDebug('Line '.$lineNumber.' error: '.$errorMessage, $import);
if (!$this->em->isOpen()) {
// Something bad must have happened if the entity manager is closed.
// We will not be able to save any entities.
throw new ORMException($errorMessage);
}
// This should be called only if the entity manager is open
$this->logImportRowError($eventLog, $errorMessage);
} else {
$this->leadEventLogRepo->saveEntity($eventLog);
}
// Release entities in Doctrine's memory to prevent memory leak
$this->em->detach($eventLog);
if (null !== $leadEntity = $eventLog->getLead()) {
$this->em->detach($leadEntity);
$company = $leadEntity->getCompany();
$primaryCompany = $leadEntity->getPrimaryCompany();
if ($company instanceof Company) {
$this->em->detach($company);
}
if ($primaryCompany instanceof Company) {
$this->em->detach($primaryCompany);
}
}
$eventLog = null;
$data = null;
// Save Import entity once per batch so the user could see the progress
if (0 === $batchSize && $import->isBackgroundProcess()) {
$isPublished = $this->getRepository()->getValue($import->getId(), 'is_published');
if (!$isPublished) {
$import->setStatus($import::STOPPED);
}
$this->saveEntity($import);
$this->dispatchEvent('batch_processed', $import);
// Stop the import loop if the import got unpublished
if (!$isPublished) {
$this->logDebug('The import has been unpublished. Stopping the import now.', $import);
break;
}
$batchSize = $config['batchlimit'];
}
if ($this->processSignalService->isSignalCaught()) {
break;
}
++$counter;
if ($limit && $counter >= $limit) {
break;
}
}
if ($import->getLastLineImported() < $import->getLineCount()) {
$import->setStatus($import::DELAYED);
$this->saveEntity($import);
}
// Close the file
$file = null;
return true;
}
/**
* Check if the CSV row has more values than the CSV header has columns.
* If it is less, generate empty values for the rest of the missing values.
* If it is more, return true.
*
* @param int $headerCount
*/
public function hasMoreValuesThanColumns(array &$data, $headerCount): bool
{
$dataCount = count($data);
if ($headerCount !== $dataCount) {
$diffCount = ($headerCount - $dataCount);
if ($diffCount > 0) {
// Fill in the data with empty string
$fill = array_fill($dataCount, $diffCount, '');
$data = $data + $fill;
} else {
return true;
}
}
return false;
}
/**
* Trim all values in a one dymensional array.
*/
public function trimArrayValues(array $data): array
{
return array_map('trim', $data);
}
/**
* Decide whether the CSV row is empty.
*
* @param mixed $row
*/
public function isEmptyCsvRow($row): bool
{
if (!is_array($row) || empty($row)) {
return true;
}
if (1 === count($row) && ('' === $row[0] || null === $row[0])) {
return true;
}
return !array_filter($row);
}
/**
* Save log about errored line.
*
* @param string $errorMessage
*/
public function logImportRowError(LeadEventLog $eventLog, $errorMessage): void
{
$eventLog->addProperty('error', $this->translator->trans($errorMessage))
->setAction('failed');
$this->leadEventLogRepo->saveEntity($eventLog);
}
/**
* Initialize LeadEventLog object and configure it as the import event.
*
* @param int $lineNumber
*/
public function initEventLog(Import $import, $lineNumber): LeadEventLog
{
$eventLog = new LeadEventLog();
$eventLog->setUserId($import->getCreatedBy())
->setUserName($import->getCreatedByUser())
->setBundle($import->getObject())
->setObject('import')
->setObjectId($import->getId())
->setProperties(
[
'line' => $lineNumber,
'file' => $import->getOriginalFile(),
]
);
return $eventLog;
}
/**
* Get line chart data of imported rows.
*
* @param string $unit {@link php.net/manual/en/function.date.php#refsect1-function.date-parameters}
* @param string $dateFormat
* @param array $filter
*/
public function getImportedRowsLineChartData($unit, \DateTimeInterface $dateFrom, \DateTimeInterface $dateTo, $dateFormat = null, $filter = []): array
{
$filter['object'] = 'import';
$filter['bundle'] = 'lead';
// Clear the times for display by minutes
/** @var \DateTime $dateFrom */
/** @var \DateTime $dateTo */
$dateFrom->modify('-1 minute');
$dateFrom->setTime($dateFrom->format('H'), $dateFrom->format('i'), 0);
$dateTo->modify('+1 minute');
$dateTo->setTime($dateTo->format('H'), $dateTo->format('i'), 0);
$query = new ChartQuery($this->em->getConnection(), $dateFrom, $dateTo, $unit);
$chart = new LineChart($unit, $dateFrom, $dateTo, $dateFormat);
$data = $query->fetchTimeData('lead_event_log', 'date_added', $filter);
$chart->setDataset($this->translator->trans('mautic.lead.import.processed.rows'), $data);
return $chart->render();
}
/**
* Returns a list of failed rows for the import.
*
* @param int $importId
* @param string $object
*
* @return array|null
*/
public function getFailedRows($importId = null, $object = 'lead')
{
if (!$importId) {
return null;
}
return $this->getEventLogRepository()->getFailedRows($importId, ['select' => 'properties,id'], $object);
}
/**
* @return ImportRepository
*/
public function getRepository()
{
return $this->em->getRepository(Import::class);
}
/**
* @return LeadEventLogRepository
*/
public function getEventLogRepository()
{
return $this->em->getRepository(LeadEventLog::class);
}
public function getPermissionBase(): string
{
return 'lead:imports';
}
/**
* Returns a unique name of a CSV file based on time.
*/
public function getUniqueFileName(): string
{
return (new DateTimeHelper())->toUtcString('YmdHis').'.csv';
}
/**
* Returns a full path to the import dir.
*/
public function getImportDir(): string
{
return $this->pathsHelper->getImportLeadsPath();
}
/**
* Get a specific entity or generate a new one if id is empty.
*/
public function getEntity($id = null): ?Import
{
if (null === $id) {
return new Import();
}
return parent::getEntity($id);
}
/**
* @throws MethodNotAllowedHttpException
*/
protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null): ?Event
{
if (!$entity instanceof Import) {
throw new MethodNotAllowedHttpException(['Import']);
}
switch ($action) {
case 'pre_save':
$name = LeadEvents::IMPORT_PRE_SAVE;
break;
case 'post_save':
$name = LeadEvents::IMPORT_POST_SAVE;
break;
case 'pre_delete':
$name = LeadEvents::IMPORT_PRE_DELETE;
break;
case 'post_delete':
$name = LeadEvents::IMPORT_POST_DELETE;
break;
case 'batch_processed':
$name = LeadEvents::IMPORT_BATCH_PROCESSED;
break;
default:
return null;
}
if ($this->dispatcher->hasListeners($name)) {
if (empty($event)) {
$event = new ImportEvent($entity, $isNew);
$event->setEntityManager($this->em);
}
$this->dispatcher->dispatch($event, $name);
return $event;
} else {
return null;
}
}
/**
* Logs a debug message if in dev environment.
*
* @param string $msg
*/
protected function logDebug($msg, Import $import = null)
{
if (MAUTIC_ENV === 'dev') {
$importId = $import ? '('.$import->getId().')' : '';
$this->logger->debug(sprintf('IMPORT%s: %s', $importId, $msg));
}
}
}