vendor/w-vision/data-definitions/src/DataDefinitionsBundle/Importer/Importer.php line 249

Open in your IDE?
  1. <?php
  2. /**
  3.  * Data Definitions.
  4.  *
  5.  * LICENSE
  6.  *
  7.  * This source file is subject to the GNU General Public License version 3 (GPLv3)
  8.  * For the full copyright and license information, please view the LICENSE.md and gpl-3.0.txt
  9.  * files that are distributed with this source code.
  10.  *
  11.  * @copyright  Copyright (c) 2016-2019 w-vision AG (https://www.w-vision.ch)
  12.  * @license    https://github.com/w-vision/DataDefinitions/blob/master/gpl-3.0.txt GNU General Public License version 3 (GPLv3)
  13.  */
  14. declare(strict_types=1);
  15. namespace Wvision\Bundle\DataDefinitionsBundle\Importer;
  16. use CoreShop\Component\Registry\ServiceRegistryInterface;
  17. use Countable;
  18. use InvalidArgumentException;
  19. use Pimcore;
  20. use Pimcore\File;
  21. use Pimcore\Mail;
  22. use Pimcore\Model\DataObject\ClassDefinition;
  23. use Pimcore\Model\DataObject\Concrete;
  24. use Pimcore\Model\DataObject\Service;
  25. use Pimcore\Model\Document;
  26. use Pimcore\Model\Factory;
  27. use Pimcore\Model\Version;
  28. use Psr\Log\LoggerAwareInterface;
  29. use Psr\Log\LoggerInterface;
  30. use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
  31. use Symfony\Component\Messenger\MessageBusInterface;
  32. use Throwable;
  33. use Wvision\Bundle\DataDefinitionsBundle\Context\ContextFactoryInterface;
  34. use Wvision\Bundle\DataDefinitionsBundle\Event\EventDispatcherInterface;
  35. use Wvision\Bundle\DataDefinitionsBundle\Exception\DoNotSetException;
  36. use Wvision\Bundle\DataDefinitionsBundle\Exception\UnexpectedValueException;
  37. use Wvision\Bundle\DataDefinitionsBundle\Filter\FilterInterface;
  38. use Wvision\Bundle\DataDefinitionsBundle\Interpreter\InterpreterInterface;
  39. use Wvision\Bundle\DataDefinitionsBundle\Loader\LoaderInterface;
  40. use Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessage;
  41. use Wvision\Bundle\DataDefinitionsBundle\Model\ImportDefinitionInterface;
  42. use Wvision\Bundle\DataDefinitionsBundle\Model\ImportMapping;
  43. use Wvision\Bundle\DataDefinitionsBundle\Model\ParamsAwareInterface;
  44. use Wvision\Bundle\DataDefinitionsBundle\Persister\PersisterInterface;
  45. use Wvision\Bundle\DataDefinitionsBundle\Provider\ArrayImportDataSet;
  46. use Wvision\Bundle\DataDefinitionsBundle\Provider\ImportDataSet;
  47. use Wvision\Bundle\DataDefinitionsBundle\Provider\ImportDataSetInterface;
  48. use Wvision\Bundle\DataDefinitionsBundle\Provider\ImportProviderInterface;
  49. use Wvision\Bundle\DataDefinitionsBundle\Runner\ImportStartFinishRunnerInterface;
  50. use Wvision\Bundle\DataDefinitionsBundle\Runner\RunnerInterface;
  51. use Wvision\Bundle\DataDefinitionsBundle\Runner\SaveRunnerInterface;
  52. use Wvision\Bundle\DataDefinitionsBundle\Runner\SetterRunnerInterface;
  53. use Wvision\Bundle\DataDefinitionsBundle\Setter\SetterInterface;
  54. final class Importer implements ImporterInterfaceAsyncImporterInterface
  55. {
  56.     private bool $shouldStop false;
  57.     public function __construct(
  58.         private ServiceRegistryInterface $providerRegistry,
  59.         private ServiceRegistryInterface $filterRegistry,
  60.         private ServiceRegistryInterface $runnerRegistry,
  61.         private ServiceRegistryInterface $interpreterRegistry,
  62.         private ServiceRegistryInterface $setterRegistry,
  63.         private ServiceRegistryInterface $cleanerRegistry,
  64.         private ServiceRegistryInterface $loaderRegistry,
  65.         private ServiceRegistryInterface $persisterRegistry,
  66.         private EventDispatcherInterface $eventDispatcher,
  67.         private ContextFactoryInterface $contextFactory,
  68.         private LoggerInterface $logger,
  69.         private Factory $modelFactory,
  70.         private ExpressionLanguage $expressionLanguage,
  71.         private MessageBusInterface $bus,
  72.     ) {
  73.     }
  74.     public function doImportRowAsync(ImportDefinitionInterface $definition, array $row, array $params): void
  75.     {
  76.         if ($definition->getCreateVersion()) {
  77.             Version::enable();
  78.         } else {
  79.             Version::disable();
  80.         }
  81.         $dataSet = new ArrayImportDataSet($row);
  82.         $runner null;
  83.         $runnerContext $this->contextFactory->createRunnerContext($definition$params$row$dataSetnull);
  84.         if ($definition->getRunner()) {
  85.             /**
  86.              * @var RunnerInterface $runner
  87.              */
  88.             $runner $this->runnerRegistry->get($definition->getRunner());
  89.         }
  90.         if ($runner instanceof ImportStartFinishRunnerInterface) {
  91.             $runner->startImport($runnerContext);
  92.         }
  93.         $filter null;
  94.         $filterType $definition->getFilter();
  95.         if ($filterType) {
  96.             /**
  97.              * @var FilterInterface $filter
  98.              */
  99.             $filter $this->filterRegistry->get($filterType);
  100.         }
  101.         $object $this->importRow(
  102.             $definition,
  103.             $row,
  104.             $dataSet,
  105.             $params,
  106.             $filter,
  107.             $runner
  108.         );
  109.     }
  110.     public function doImportAsync(ImportDefinitionInterface $definition, array $params): void
  111.     {
  112.         /** @var ImportDataSetInterface|array $data */
  113.         $data $this->getData($definition$params);
  114.         foreach ($data as $row) {
  115.             $this->bus->dispatch(
  116.                 new ImportRowMessage(
  117.                     $definition->getId(),
  118.                     $row,
  119.                     $params,
  120.                 )
  121.             );
  122.         }
  123.     }
  124.     public function doImport(ImportDefinitionInterface $definition$params): array
  125.     {
  126.         $filter null;
  127.         if ($definition->getCreateVersion()) {
  128.             Version::enable();
  129.         } else {
  130.             Version::disable();
  131.         }
  132.         $filterType $definition->getFilter();
  133.         if ($filterType) {
  134.             /**
  135.              * @var FilterInterface $filter
  136.              */
  137.             $filter $this->filterRegistry->get($filterType);
  138.         }
  139.         /** @var ImportDataSetInterface|array $data */
  140.         $data $this->getData($definition$params);
  141.         $runner null;
  142.         $runnerContext $this->contextFactory->createRunnerContext($definition$paramsnull$datanull);
  143.         if ($definition->getRunner()) {
  144.             /**
  145.              * @var RunnerInterface $runner
  146.              */
  147.             $runner $this->runnerRegistry->get($definition->getRunner());
  148.         }
  149.         if ((\is_countable($data) || $data instanceof Countable) && ($count \count($data)) > 0) {
  150.             $this->eventDispatcher->dispatch($definition'data_definitions.import.total'$count$params);
  151.         }
  152.         if ($runner instanceof ImportStartFinishRunnerInterface) {
  153.             $runner->startImport($runnerContext);
  154.         }
  155.         [$objectIds$exceptions] = $this->runImport($definition$params$filter$runner$data);
  156.         if ($runner instanceof ImportStartFinishRunnerInterface) {
  157.             $runner->finishImport($runnerContext);
  158.         }
  159.         $cleanerType $definition->getCleaner();
  160.         if ($cleanerType) {
  161.             $cleaner $this->cleanerRegistry->get($cleanerType);
  162.             $this->logger->info(sprintf('Running Cleaner "%s"'$cleanerType));
  163.             $this->eventDispatcher->dispatch(
  164.                 $definition,
  165.                 'data_definitions.import.status',
  166.                 sprintf('Running Cleaner "%s"'$cleanerType)
  167.             );
  168.             if ($cleaner instanceof ParamsAwareInterface) {
  169.                 $cleaner->setParams($params);
  170.             }
  171.             if ($cleaner instanceof LoggerAwareInterface) {
  172.                 $cleaner->setLogger($this->logger);
  173.             }
  174.             $cleaner->cleanup($definition$objectIds);
  175.             $this->logger->info(sprintf('Finished Cleaner "%s"'$cleanerType));
  176.             $this->eventDispatcher->dispatch(
  177.                 $definition,
  178.                 'data_definitions.import.status',
  179.                 sprintf('Finished Cleaner "%s"'$cleanerType)
  180.             );
  181.         }
  182.         if (count($exceptions) > 0) {
  183.             $this->processFailedImport($definition$params$objectIds$exceptions);
  184.         } else {
  185.             $this->processSuccessfullImport($definition$params$objectIds$exceptions);
  186.         }
  187.         $this->eventDispatcher->dispatch($definition'data_definitions.import.finished'''$params);
  188.         return $objectIds;
  189.     }
  190.     public function processSuccessfullImport(ImportDefinitionInterface $definition$params$objectIds$exceptions)
  191.     {
  192.         $this->sendDocument(
  193.             $definition,
  194.             Document::getById($definition->getSuccessNotificationDocument()),
  195.             $objectIds,
  196.             $exceptions
  197.         );
  198.         $this->eventDispatcher->dispatch($definition'data_definitions.import.success'$params);
  199.     }
  200.     public function processFailedImport(ImportDefinitionInterface $definition$params$objectIds$exceptions)
  201.     {
  202.         $this->sendDocument(
  203.             $definition,
  204.             Document::getById($definition->getFailureNotificationDocument()),
  205.             $objectIds,
  206.             $exceptions
  207.         );
  208.         $this->eventDispatcher->dispatch($definition'data_definitions.import.failure'$params);
  209.     }
  210.     public function stop(): void
  211.     {
  212.         $this->shouldStop true;
  213.     }
  214.     private function sendDocument(
  215.         ImportDefinitionInterface $definition,
  216.         ?Document $document,
  217.         array $objectIds,
  218.         array $exceptions
  219.     ) {
  220.         if ($document instanceof Document) {
  221.             $params = [
  222.                 'exceptions' => $exceptions,
  223.                 'objectIds' => $objectIds,
  224.                 'className' => $definition->getClass(),
  225.                 'countObjects' => count($objectIds),
  226.                 'countExceptions' => count($exceptions),
  227.                 'name' => $definition->getName(),
  228.                 'provider' => $definition->getProvider(),
  229.             ];
  230.             if ($document instanceof Document\Email) {
  231.                 $mail = new Mail();
  232.                 $mail->setDocument($document);
  233.                 $mail->setParams($params);
  234.                 $mail->send();
  235.             }
  236.         }
  237.     }
  238.     private function getData(ImportDefinitionInterface $definition, array $params)
  239.     {
  240.         /** @var ImportProviderInterface $provider */
  241.         $provider $this->providerRegistry->get($definition->getProvider());
  242.         return $provider->getData($definition->getConfiguration(), $definition$params);
  243.     }
  244.     private function runImport(
  245.         ImportDefinitionInterface $definition,
  246.         array $params,
  247.         FilterInterface $filter null,
  248.         RunnerInterface $runner null,
  249.         ImportDataSetInterface $dataSet null,
  250.     ): array {
  251.         if (null === $dataSet) {
  252.             $dataSet = new ImportDataSet(new \EmptyIterator());
  253.         }
  254.         $count 0;
  255.         $countToClean 50;
  256.         $objectIds = [];
  257.         $exceptions = [];
  258.         foreach ($dataSet as $row) {
  259.             if ($row === null) {
  260.                 continue;
  261.             }
  262.             try {
  263.                 $object $this->importRow(
  264.                     $definition,
  265.                     $row,
  266.                     $dataSet,
  267.                     array_merge($params, ['row' => $count]),
  268.                     $filter,
  269.                     $runner
  270.                 );
  271.                 if ($object instanceof Concrete) {
  272.                     $objectIds[] = $object->getId();
  273.                 }
  274.             } catch (Throwable $ex) {
  275.                 $this->logger->error($ex);
  276.                 $exceptions[] = $ex;
  277.                 $this->eventDispatcher->dispatch(
  278.                     $definition,
  279.                     'data_definitions.import.failure',
  280.                     sprintf('Error: %s'$ex->getMessage()),
  281.                     $params
  282.                 );
  283.                 if ($definition->getStopOnException()) {
  284.                     throw $ex;
  285.                 }
  286.             } finally {
  287.                 if (($count 1) % $countToClean === 0) {
  288.                     Pimcore::collectGarbage();
  289.                     $this->logger->info('Clean Garbage');
  290.                     $this->eventDispatcher->dispatch(
  291.                         $definition,
  292.                         'data_definitions.import.status',
  293.                         'Collect Garbage',
  294.                         $params
  295.                     );
  296.                 }
  297.                 $count++;
  298.             }
  299.             $this->eventDispatcher->dispatch($definition'data_definitions.import.progress'''$params);
  300.             if ($this->shouldStop) {
  301.                 $this->eventDispatcher->dispatch(
  302.                     $definition,
  303.                     'data_definitions.import.status',
  304.                     'Process has been stopped.'
  305.                 );
  306.                 return [$objectIds$exceptions];
  307.             }
  308.         }
  309.         return [$objectIds$exceptions];
  310.     }
  311.     private function importRow(
  312.         ImportDefinitionInterface $definition,
  313.         array $data,
  314.         ImportDataSetInterface $dataSet,
  315.         array $params,
  316.         FilterInterface $filter null,
  317.         RunnerInterface $runner null,
  318.     ): ?Concrete {
  319.         $object $this->getObject($definition$data$dataSet$params);
  320.         if (null !== $object && !$object->getId()) {
  321.             if ($definition->getSkipNewObjects()) {
  322.                 $this->eventDispatcher->dispatch(
  323.                     $definition,
  324.                     'data_definitions.import.status',
  325.                     'Ignoring new Object',
  326.                     $params
  327.                 );
  328.                 return null;
  329.             }
  330.         } else {
  331.             if ($definition->getSkipExistingObjects()) {
  332.                 $this->eventDispatcher->dispatch(
  333.                     $definition,
  334.                     'data_definitions.import.status',
  335.                     'Ignoring existing Object',
  336.                     $params
  337.                 );
  338.                 return null;
  339.             }
  340.         }
  341.         if ($filter instanceof FilterInterface) {
  342.             if ($filter instanceof LoggerAwareInterface) {
  343.                 $filter->setLogger($this->logger);
  344.             }
  345.             $context $this->contextFactory->createFilterContext($definition$params$data$dataSet$object);
  346.             if (!$filter->filter($context)) {
  347.                 $this->eventDispatcher->dispatch(
  348.                     $definition,
  349.                     'data_definitions.import.status',
  350.                     'Filtered Object',
  351.                     $params
  352.                 );
  353.                 return null;
  354.             }
  355.         }
  356.         $this->eventDispatcher->dispatch(
  357.             $definition,
  358.             'data_definitions.import.status',
  359.             sprintf('Import Object %s', ($object->getId() ? $object->getFullPath() : 'new')),
  360.             $params
  361.         );
  362.         $this->eventDispatcher->dispatch(
  363.             $definition,
  364.             'data_definitions.import.object.start',
  365.             $object,
  366.             $params
  367.         );
  368.         $runnerContext $this->contextFactory->createRunnerContext($definition$params$data$dataSet$object);
  369.         if ($runner instanceof RunnerInterface) {
  370.             if ($runner instanceof LoggerAwareInterface) {
  371.                 $runner->setLogger($this->logger);
  372.             }
  373.             $runner->preRun($runnerContext);
  374.         }
  375.         $this->logger->info(sprintf('Imported Object: %s'$object->getRealFullPath()));
  376.         /** @var ImportMapping $mapItem */
  377.         foreach ($definition->getMapping() as $mapItem) {
  378.             $value null;
  379.             if (array_key_exists($mapItem->getFromColumn(), $data) || $mapItem->getFromColumn() === "custom") {
  380.                 $value $data[$mapItem->getFromColumn()] ?? null;
  381.                 $this->setObjectValue($object$mapItem$value$data$dataSet$definition$params$runner);
  382.             }
  383.         }
  384.         $shouldSave true;
  385.         if ($runner instanceof SaveRunnerInterface) {
  386.             if ($runner instanceof LoggerAwareInterface) {
  387.                 $runner->setLogger($this->logger);
  388.             }
  389.             $shouldSave $runner->shouldSaveObject($runnerContext);
  390.         }
  391.         if ($shouldSave) {
  392.             $params['versionNote'] = sprintf('%s - %s'$definition->getId(), $definition->getName());
  393.             $object->setUserModification($params['userId'] ?? 0);
  394.             $object->setOmitMandatoryCheck($definition->getOmitMandatoryCheck());
  395.             $this->saveObject($object$definition$params);
  396.             $this->eventDispatcher->dispatch(
  397.                 $definition,
  398.                 'data_definitions.import.status',
  399.                 sprintf('Imported Object %s'$object->getFullPath()),
  400.                 $params
  401.             );
  402.         } else {
  403.             $this->eventDispatcher->dispatch(
  404.                 $definition,
  405.                 'data_definitions.import.status',
  406.                 sprintf('Skipped Object %s'$object->getFullPath()),
  407.                 $params
  408.             );
  409.         }
  410.         $this->eventDispatcher->dispatch(
  411.             $definition,
  412.             'data_definitions.import.status',
  413.             sprintf('Imported Object %s'$object->getFullPath()),
  414.             $params
  415.         );
  416.         $this->eventDispatcher->dispatch(
  417.             $definition,
  418.             'data_definitions.import.object.finished',
  419.             $object,
  420.             $params
  421.         );
  422.         if ($runner instanceof RunnerInterface) {
  423.             if ($runner instanceof LoggerAwareInterface) {
  424.                 $runner->setLogger($this->logger);
  425.             }
  426.             $runner->postRun($runnerContext);
  427.         }
  428.         return $object;
  429.     }
  430.     private function setObjectValue(
  431.         Concrete $object,
  432.         ImportMapping $map,
  433.         $value,
  434.         array $data,
  435.         ImportDataSetInterface $dataSet,
  436.         ImportDefinitionInterface $definition,
  437.         array $params,
  438.         RunnerInterface $runner null
  439.     ): void {
  440.         if ($map->getInterpreter()) {
  441.             try {
  442.                 $interpreter $this->interpreterRegistry->get($map->getInterpreter());
  443.                 if (!$interpreter instanceof InterpreterInterface) {
  444.                     return;
  445.                 }
  446.                 if ($interpreter instanceof LoggerAwareInterface) {
  447.                     $interpreter->setLogger($this->logger);
  448.                 }
  449.                 try {
  450.                     $context $this->contextFactory->createInterpreterContext(
  451.                         $definition,
  452.                         $params,
  453.                         $map->getInterpreterConfig() ?? [],
  454.                         $data,
  455.                         $dataSet,
  456.                         $object,
  457.                         $value,
  458.                         $map
  459.                     );
  460.                     $value $interpreter->interpret($context);
  461.                 } catch (UnexpectedValueException $ex) {
  462.                     $this->logger->info(
  463.                         sprintf(
  464.                             'Unexpected Value from Interpreter "%s" with message "%s"',
  465.                             $map->getInterpreter(),
  466.                             $ex->getMessage()
  467.                         )
  468.                     );
  469.                 }
  470.             } catch (DoNotSetException $ex) {
  471.                 return;
  472.             }
  473.         }
  474.         if ($map->getToColumn() === 'o_type' && $map->getSetter() !== 'object_type') {
  475.             throw new InvalidArgumentException('Type has to be used with ObjectType Setter!');
  476.         }
  477.         $shouldSetField true;
  478.         if ($runner instanceof SetterRunnerInterface) {
  479.             if ($runner instanceof LoggerAwareInterface) {
  480.                 $runner->setLogger($this->logger);
  481.             }
  482.             $shouldSetField $runner->shouldSetField($object$map$value$data$definition$params);
  483.         }
  484.         if (!$shouldSetField) {
  485.             return;
  486.         }
  487.         if ($map->getSetter()) {
  488.             $setter $this->setterRegistry->get($map->getSetter());
  489.             $setterContext $this->contextFactory->createSetterContext(
  490.                 $definition,
  491.                 $params,
  492.                 $object,
  493.                 $map,
  494.                 $data,
  495.                 $dataSet,
  496.                 $value
  497.             );
  498.             if ($setter instanceof SetterInterface) {
  499.                 if ($setter instanceof LoggerAwareInterface) {
  500.                     $setter->setLogger($this->logger);
  501.                 }
  502.                 $setter->set($setterContext);
  503.             }
  504.         } else {
  505.             $object->setValue($map->getToColumn(), $value);
  506.         }
  507.     }
  508.     private function getObject(
  509.         ImportDefinitionInterface $definition,
  510.         $data,
  511.         ImportDataSetInterface $dataSet,
  512.         $params
  513.     ): Concrete {
  514.         $class $definition->getClass();
  515.         $classObject '\Pimcore\Model\DataObject\\'.ucfirst($class);
  516.         $classDefinition ClassDefinition::getByName($class);
  517.         if (!$classDefinition instanceof ClassDefinition) {
  518.             throw new InvalidArgumentException(sprintf('Class not found %s'$class));
  519.         }
  520.         /**
  521.          * @var $loader LoaderInterface
  522.          */
  523.         if ($definition->getLoader()) {
  524.             $loader $this->loaderRegistry->get($definition->getLoader());
  525.         } else {
  526.             $loader $this->loaderRegistry->get('primary_key');
  527.         }
  528.         $loaderContext $this->contextFactory->createLoaderContext($definition$params$data$dataSet$class);
  529.         $obj $loader->load($loaderContext);
  530.         if (null === $obj) {
  531.             $classImplementation $this->modelFactory->getClassNameFor($classObject) ?? $classObject;
  532.             $obj = new $classImplementation();
  533.         }
  534.         $key Service::getValidKey($this->createKey($definition$data), 'object');
  535.         if ($definition->getRelocateExistingObjects() || !$obj->getId()) {
  536.             $obj->setParent(Service::createFolderByPath($this->createPath($definition$data)));
  537.         }
  538.         if ($definition->getRenameExistingObjects() || !$obj->getId()) {
  539.             if ($key && $definition->getKey()) {
  540.                 $obj->setKey($key);
  541.             } else {
  542.                 $obj->setKey(File::getValidFilename(uniqid(''true)));
  543.             }
  544.         }
  545.         if (!$obj->getKey()) {
  546.             throw new InvalidArgumentException('No key set, please check your import-data');
  547.         }
  548.         $obj->setKey(Service::getUniqueKey($obj));
  549.         return $obj;
  550.     }
  551.     private function createPath(ImportDefinitionInterface $definition, array $data): string
  552.     {
  553.         if (!$definition->getObjectPath()) {
  554.             return '';
  555.         }
  556.         if (str_starts_with($definition->getObjectPath(), '@')) {
  557.             return $this->expressionLanguage->evaluate(substr($definition->getObjectPath(), 1), $data);
  558.         }
  559.         return $definition->getObjectPath() ?? '';
  560.     }
  561.     private function createKey(ImportDefinitionInterface $definition, array $data): string
  562.     {
  563.         if (!$definition->getKey()) {
  564.             return '';
  565.         }
  566.         if (str_starts_with($definition->getKey(), '@')) {
  567.             return $this->expressionLanguage->evaluate(substr($definition->getKey(), 1), $data);
  568.         }
  569.         return $definition->getKey();
  570.     }
  571.     private function saveObject(Concrete $objectImportDefinitionInterface $definition, array $params): void
  572.     {
  573.         $persister null;
  574.         if ($definition->getPersister()) {
  575.             $persister $this->persisterRegistry->get($definition->getPersister());
  576.         }
  577.         if (!$persister instanceof PersisterInterface) {
  578.             $persister $this->persisterRegistry->get('persister');
  579.         }
  580.         if ($persister instanceof LoggerAwareInterface) {
  581.             $persister->setLogger($this->logger);
  582.         }
  583.         $persister->persist($object$definition$params);
  584.     }
  585. }