MigrateDownloadCountsCommand.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. <?php declare(strict_types=1);
  2. namespace Packagist\WebBundle\Command;
  3. use Symfony\Component\Console\Input\InputInterface;
  4. use Symfony\Component\Console\Input\InputOption;
  5. use Symfony\Component\Console\Output\OutputInterface;
  6. use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
  7. use Seld\Signal\SignalHandler;
  8. use Packagist\WebBundle\Entity\Package;
  9. class MigrateDownloadCountsCommand extends ContainerAwareCommand
  10. {
  11. protected function configure()
  12. {
  13. $this
  14. ->setName('packagist:migrate-download-counts')
  15. ->setDescription('Migrates download counts from redis to mysql')
  16. ;
  17. }
  18. protected function execute(InputInterface $input, OutputInterface $output)
  19. {
  20. $locker = $this->getContainer()->get('locker');
  21. $logger = $this->getContainer()->get('logger');
  22. // another migrate command is still active
  23. $lockAcquired = $locker->lockCommand($this->getName());
  24. if (!$lockAcquired) {
  25. if ($input->getOption('verbose')) {
  26. $output->writeln('Aborting, another task is running already');
  27. }
  28. return;
  29. }
  30. $signal = SignalHandler::create(null, $logger);
  31. $downloadManager = $this->getContainer()->get('packagist.download_manager');
  32. $doctrine = $this->getContainer()->get('doctrine');
  33. try {
  34. // might be a large-ish dataset coming through here
  35. ini_set('memory_limit', '1G');
  36. $redis = $this->getContainer()->get('snc_redis.default_client');
  37. $now = new \DateTimeImmutable();
  38. $todaySuffix = ':'.$now->format('Ymd');
  39. $keysToUpdate = $redis->keys('dl:*:*');
  40. // skip today datapoints as we will store that to the DB tomorrow
  41. $keysToUpdate = array_filter($keysToUpdate, function ($key) use ($todaySuffix) {
  42. return strpos($key, $todaySuffix) === false;
  43. });
  44. // sort by package id, then package datapoint first followed by version datapoints
  45. usort($keysToUpdate, function ($a, $b) {
  46. $amin = preg_replace('{^(dl:\d+).*}', '$1', $a);
  47. $bmin = preg_replace('{^(dl:\d+).*}', '$1', $b);
  48. if ($amin !== $bmin) {
  49. return strcmp($amin, $bmin);
  50. }
  51. return strcmp($b, $a);
  52. });
  53. // buffer keys per package id and process all keys for a given package one by one
  54. // to reduce SQL load
  55. $buffer = [];
  56. $lastPackageId = null;
  57. while ($keysToUpdate) {
  58. $key = array_shift($keysToUpdate);
  59. if (!preg_match('{^dl:(\d+)}', $key, $m)) {
  60. $logger->error('Invalid dl key found: '.$key);
  61. continue;
  62. }
  63. $packageId = (int) $m[1];
  64. if ($lastPackageId && $lastPackageId !== $packageId) {
  65. $logger->debug('Processing package #'.$lastPackageId);
  66. $downloadManager->transferDownloadsToDb($lastPackageId, $buffer, $now);
  67. $buffer = [];
  68. $doctrine->getManager()->clear();
  69. if ($signal->isTriggered()) {
  70. break;
  71. }
  72. }
  73. $buffer[] = $key;
  74. $lastPackageId = $packageId;
  75. }
  76. // process last package
  77. if ($buffer) {
  78. $downloadManager->transferDownloadsToDb($lastPackageId, $buffer, $now);
  79. }
  80. } finally {
  81. $locker->unlockCommand($this->getName());
  82. }
  83. }
  84. }