Skip to content Skip to sidebar Skip to footer

When A New File Arrives In S3, Trigger Luigi Task

I have a bucket with new objects getting added at random intervals with keys based on their time of creation. For example: 's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strf

Solution 1:

Below is a very rough outline of how it could look. I think the main difference from your outline in regards to luigi working as a pull system is that you specify the output you want first, which then triggers the other tasks upon which that output depends. So, rather than naming things with the time the crawl ends, it is easier to name things after something you know at the start. It is possible to do it the other way, just a lot of unnecessary complication.

classCrawlTask(luigi.Task):
    crawltime = luigi.DateParameter()

    defrequires(self):
        passdefget_filename(self):
        return"s3://my-bucket/crawl_{}.csv".format(self.crawltime)

    defoutput(self):
        return S3Target(self.get_filename())

    defrun(self):
        perform_crawl(s3_filename=self.get_filename())


classCleanTask(luigi.Task):
    crawltime = luigi.DateParameter()

    defrequires(self):
        return CrawlTask(crawltime=self.crawltime)

    defget_filename(self):
        return"s3://my-bucket/clean_crawl_{}.csv".format(self.crawltime)

    defoutput(self):
        return S3Target(self.get_filename())

    defrun(self):
        perform_clean(input_file=self.input().path, output_filename=self.get_filename())


classMatchTask(luigi.Task):
    crawltime = luigi.DateParameter()

    defrequires(self):
        return CleanTask(crawltime=self.crawltime)

    defoutput(self):
        return##?? whatever output of this task isdefrun(self):
        perform_match(input_file=self.input().path)

Solution 2:

What you could do is create a larger system that encapsulates both your crawls and processing. This way you don't have to check s3 for new objects. I haven't used luigi before, but maybe you can turn your scrapy job into a task, and when it's done do your processing task. Anyway, I don't think 'checking' s3 for new stuff is a good idea because 1. you will have to use lots of API calls, and 2. You will need to write a bunch of code to check if something is 'new' or not, which could get hairy.

Post a Comment for "When A New File Arrives In S3, Trigger Luigi Task"