Skip to main content

Flow Overview

This is a really general overview of the content intake flow, from a content source (Bookwire, Ingram) sending the content, to us inserting a publication into Farfalla.

While reading this documentation, have the code and the database opened on a side, it will be much easier to understand :).

Introduction

We are using an Event Sourcing approach. The content source will dump thousands of onix files on the S3 bucket. Those files corresponds to what we call content publishers, which are the many publishers the content source has agreements with.

It is important that you take a look at the naming convention for the Onix files.

The publishers send the data releated to the books via this onix files. On one file, we will get the metadata of a particular book, and that metadata can be later updated by another Onix file. Any value can change, like the price, availability, publishing date, etc... So it is important that before inserting a book into our system, we build "the history of the book", so we get the latest updates in our system.

After a book is inserted, we could also recive further updates to it, or the most common case: the book is withdrawn from sale. It's extremely important that we keep updating the books, and do this fast enough.

As discussed previously, an Onix file could include one, or many "products blocks". We call this products blocks Events. Each of this events define a the whole metadata of a book. It is important to remark the word whole. This means that subsequent events releated to the same book will completely overwrites previous ones. Onix 3 has support for defining incremental updates, that means, only sending the data which is actually updated: luckly, Ingram does not make use of this. It makes the process much easier.

Ingram intake

Ingram will dump all the content and Onix files into a S3 bucket. Each Onix file will include all the content for a single publisher, it could be hundread or even thousands of products in a single Onix file. You can read more about it here.

BW intake

BW will also dump all the content into a S3 bucket, but the Onix files will be located in several /data/store_XXXX subdirectories, where XXXX is a tenant ID. You can read more about this here. Each Onix file will include only one content and it will be named after it's ISBN. You can read more about the S3 bucket here.

Content Intake setup

On the Nova dashboard, you will find a resource called Content Intake.

This will hold the configuration for each content source.

It is important though to setup this correctly.

  • Name: not important
  • Enabled: this flag handles the first step of the content intake, where we check the FTP for new Onix files and turn them into Events. If this is disabled the content intake will stop, but we if we still have pending Events, those will still be processed.
  • Make dry requests: not being used.
  • Use dummy files: when enabled, Medusa will send to Farfalla "dummy files" (lightweight EPUBs and PDFs) instead of the original publication files, which can be huge. This is used mostly for testing purposes on the staging env, where we don't care about having the real content.
  • Disk Name: This is the name of the s3 bucket.
  • Show in marketplace: Used by BW. This will define include the intaked content in the marketplace or not.
  • Uploads content on behalf of tenants: Used by BW. When true, the intaked content will be automatically shared (using shared licence) from the base tenant ID to another tenant, based on the directory structure (/data/store_XXXX). i.e: if the Onix file is placed at /data/store_1234, the content will be send to BW tenant and then shared with the tenant ID 1234
  • Tenand ID: Tenant ID in farfalla.
  • Tenant Domain: the final domain of the tenant that corresponds to this content publisher.
  • Tenant Api Key: Used for authentication in Farfalla.

Jobs Orchestration

See: app/Console/Kernel.php

The content intake process involves many jobs. On this file, is where the jobs are scheduled to run thanks to the Laravel Tasks Scheduler.

This is the current list of jobs involved on the content intake process:

  • DiscoverOnixFilesJob
  • OnixFileCrawler
  • DiscoverOnixEventsJob
  • OnixEventCrawlerJob
  • IssueIngestCrawlerJob
  • IssueUpdateCrawlerJob

You can customize the different batch sizes and turn off and on different jobs/steps of the content intake, using env variables. Remember that for changes to take effect, you will have to re-deploy from the Vapor dashboard.

This are the current good and tested default values.

// This will control the OnixFileCrawler
FILE_BATCH=10
FILE_ENABLED=true

// This will control the OnixEventCrawlerJob
EVENT_ENABLED=true
EVENT_BATCH=100

// This will control the IssueIngestCrawlerJob
INGEST_ENABELD=true
INGEST_BATCH=10
INGEST_RETRIES=3

// This will control the IssueUpdateCrawlerJob
UPDATE_BATCH=15
UPDATE_ENABLED=true

All jobs are independent from each other. Regardless, it is important to avoid the overlapping of jobs as much as possible - this means, have two jobs to be scheduled at the same time.

When scheduling jobs, take into account:

  • Do not to run the ingest and update crawlers at the same time with really big batches (> 100): this could lead to deadlocks on the onix_issues table.
  • If the content source includes multiple products in each Onix file (like Ingram does), do not try to discover big batches of onix files and run this job frequently. Some Onix files are HUGE (GBs of magnitude), and those could take 30-45 min to fully process. This big files will really tax the whole system and hit hard on the database.

Keep a track of the onix files

See: app/Jobs/DiscoverOnixFiles.php
See: onix_files table

We have to keep track of the Onix files that we got and which ones have been processed. The job DiscoverOnixFiles will scan the S3 bucket at the /data path, and create an entry for each onix under the onix_files table.

This are the most relevant things about this table:

  • We make use of the checksum value to make sure that each files has a unique record on the table.
  • The sent_at prop is important as its plays as reference for the events, that will let us process these in order in the future. The sent_at date is encoded on the file name.
  • The publisher field is extremely important to know which files correspond to which content_publisher.
  • The status field can take some values. Current values are: discovered, on-queue, done, fail. The initial value discovered means that the file has been discovered but not processed yet.

Processing the files in batches

See: app/Jobs/OnixFileCrawler.php
See: onix_files table

This job has only one responsability. Take a batch of files with the discovered status, and push them to the queue to be processed (with the DiscoverOnixEventsJob).

This job doesn't know nor care about the size of the Onix Files. As said above, big files can really tax the whole system, so it's important to dispatch this job only once/twice every hour, and keep the batch sizes small enough so the chances of processing two or more big Onix files at the same time is small (I would say that a batch size of 10 is pretty conservative but secure).

Remember that you can always go to the Nova Dashboard and dispatch the processing of Onix files manually via an Action.

Discovering the events

See: app/Jobs/DiscoverOnixEvents.php
See: onix_events table

Discovering the "events" just means opening the Onix file and creating an entry on onix_events for each <Product /> node that we find.

How it works

The job makes uses of a helper class called OnixParser. This is a simple class that will stream the Onix file from S3 and provide a method ->each() that takes a callback.

The first param of the callback will be the <Product /> node as an associative array, and the second argument is the type (this is helpful because Onix files have a <Header /> section that we want to ignore). The each method will iterate over the nodes of the XML automatically: you can return false inside the callback to stop the loop.

While the file is being processed, it will be marked with the status = on-queue. When it finishes, we will set status = done.

In case of failure, files will be marked as status = failed. We can safely redispatch the discovery of this file via an Action on the Medusa Nova Dashboard, as the events discovered previously to the failure won't be duplicated, thanks to the md5 attribute (a hash of the payload of the event).

When the events are created, they are set with the status = null (of the onix_events table). The reason behind this is: processing an Onix file can take time. We want to avoid the OnixEventCrawler job (see later) to work on the events of Onix files that have been not fully processed yet. After all events have been discovered, the app/Jobs/SetOnixEventReady job will dispatch: it will set the status = discovered of those events.

About big files

Vapor and AWS SQS has a limit on the time a job can run: up to 15 minutes. This means that big Onix files (like 1GB on size) will take more than 15 min to process.

To overcome this, we have to stop processing a file, take a note at which node we are currently at, and self-redispatch the job to resume on the node that we left.

The OnixParser class provides a simple method called ->jumpToNode($node) which is self descriptive.

The OnixEventCrawler

See: app/Jobs/OnixEventCrawler

This job is scheduled to run once every minute. Basically it will fetch a number of events from the database and send them to processing.

It makes uses of Atomic Locks to guarantee that only an instance of the job is running at the same time.

Fetching the events

In case you need to tweak the fetching query, please take into account that:

  • We must group events from the same combination content_publisher-record_reference, and make sure to process THE LAST EVENT.
  • Batch sizes must be kept relatively small. I had success with a size of up to 300. This value is used when I need to process the events fast. Normally, a batch of 100 is enough.
  • The order of the events is extremely important, hence why the ->orderBy('sent_at')->orderBy('id') on the query.

Note: the field sent_at represents a timestamp of the date the Onix file was sent.

Why we must group by record_reference ?

The Onix specification says that the record reference is an attribute used by the content distributor to uniquely identify a book into their "world". This record reference could be anything: numbers, uuids, or even the ISBN of the books.

We have no guarantee that two or more publishers are using the same convention to create the record reference. So thats why we need to form groups where the key is content_publisher-record_reference.

There is a really low chance that a batch size of 100 event would grab two or more events for the same group key (content_publisher-record_reference), but the possibility is there.

Batching

Once the groups are formed, we can dispatch those in batch, to be executed in parallel. Once the batch finishes procesing, we free the job lock.

Processing events: filling the onix_issues table

See: app/Jobs/ProcessOnixEvent

This job will get an array of ids, and has to decide which event must be processed.

The rule is simple: last events completely override previous ones. So if we have a list of events, we have to only process the newest one (higher sent_at).

Processing the event means creating or updating an entry on the onix_issues table. On this table, you will find a list of all the publications that we have discovered, with some useful metadata destructured to make our life easier. In this context, the most important one here is a reference to the last_event_id.

The job also makes use of the ProductExtractor::class, a helper class that will take a payload in the contructor and provide methods to get metadata all the metadata we care about, like the title, isbn, BISAC, etc... How we extract the metadata is really dependant of what we want to get, since some data is mandatory, other can have multiple values or not be present at all. You will need to review the Onix 3 Specification to learn more.

Ingram (so we) don't distinghish between events that create (the first definition) or updates books. So we must check if the issue that the events refeers to have been created by other event in the past (check for a combination of content_publisher and record_reference, don't rely on the ISBN!).

If that issue does not exists, then we create it with the metadata from the event.

In the case it exists, we only update the record if the previous event the onix_issue record points to is older than the current one. Remember that new events override older ones.

The same logic of status with the onix_files applies here.

Ingesting publications

See: app/Jobs/IssueIngestCrawler

Picking the right ones

The process of ingesting publications is very similar to the one of events. It will scan the onix_issues table for suitable books that can be ingested.

Try to keep this query as simple and intact as possible: it can quickly became slow, because it is currently optimized with indexes suitable for the where's that we are currently doing. A base query must contain the following:

  • publishing_status = active: this are the only books available to ingest.
  • ingested_at = null: this books have not yet been ingested into Farfalla
  • ignore = null: the ignore field represents books that should not be ingested for various reasons.
  • group by isbn: this is a limitation of Farfalla - the ISBN field will become the external_id on the issues table.
  • share_with_tenant_id is not null): we are currently in the process of removing Ingram so we are using this to process only BW events. We should eventually remove this.
  • Keep the batch size small: to avoid race conditions. Sizes of 10 are working almost perfectly, sizes of +20 become problematic.

From this query, we will get groups of isbn => [...onix_issue_ids]. In the case that we get more than two ids for the same isbn, we have to decide which book we are going to actually ingest. For this, we will pick EPUBs over PDFs.

The result will the a pool of ids to be ingested, and a pool of ids that should be discarted. The first ones are going to be pushed to the queue, and we will mark the latter ones as ignore = ignore

Ingesting the publications

See: app/Jobs/IngestOnixIssue

On this job, we are going to build the request and send it to Farfalla (`https://tenant_url/integration-api/v1/onix)

Since we are ingesting an issue, we need to tell Farfalla where it should look for the files of the publication. So a temporary temporary URL is generated to download the files, directly from the content source s3 bucket. We are not sending the files on the request.

This is the current payload of the request:

{
'show_in_marketplace' : bool,
'share_with_tenant_id' : number,
'author': 'string[]',
'description: 'string',
'external_id' : 'string',
'lang' : 'string',
'bisac' : 'string[]',
'name' : 'string',
'prices' : [
{
'amount': 'number',
'currenty': 'string',
'discount': 'number'
}
],
'type' : 'string',
'publishing_status' : 'string',
'published_at' : 'date',
'publisher' : 'string[]',
'sales_rights' : {
'included': string,
'excluded': string,
},
'user_token' : 'string',
'issue_files': [
{
'name': 'string',
'extension': 'string',
'file_name': 'string',
'url': 'string',
'etag': 'string'
}
],
'keywords' : 'string[]',
}

After the body payload is generated, we are going to send the request. And the following situations may occur.

If we get a status = 200, the issue has been ingested successfully, so set the ingested_at = now().

If we get a status = 400ish, corresponding to a client error, this means that the request failed the validation in Farfalla. We are going to tag the onix_issue as ignore = client-error, and log the Farfalla response, to take further actions later.

If we get a status = 500ish, this means that an exception happened in Farfalla. Most of the times, this is due to race conditions. I would like this ingest to be retried again the future.

So that's where the shouldBeRetried() comes into play: on the cache, it stores the amount of times that this "ingest" has been attempted.

  • If the method returns true, we will just finish the job and by not setting the ignore attribute to any value, we will let the next OnixIngestCrawler job to take this issue again and retry.
  • In the case that the method returns false, we will mark the issue as ignored = server-error, log the response (on production env, since debug = false, does not provide much help), and finish.

What are race conditions ?

What is a race condition: Wikipedia article

Specifically to this process, the race conditions happen when the batch of the IssueIngestCrawler is big enough that multiple issues that share the same taxonomies are tried to be inserted at the same time.

Imagine we have two issues that share the publisher = publicala. Since in Farfalla the taxonomies must be unique, one issue will check the terms table and see that the there is no term publisher = publicala, and it will attempt to create it. At the exact same time, the second issue will come in and came to a similar conclusion. After that, the first query will finish inserting the term in the table, and the second one will throw an Exception because the term already exist on the database. This will lead to a 500 error.

I have spent many hours trying different approaches to solve the problem. The only reliable solution I found is: make the batch size as small as possible.

Keeping publications up to date

See: app/Jobs/IssueUpdateCrawler.php

After a publication has been inserted in Farfalla, we can potentially get updates or withdraw notices, that will require us to remove the publication from sale.

Medusa does not and must not care about the type of update a publication gets.

How do we know which issues has been updated? When ingested_at != null && ingested_at < updated_at. We will look for those issues and create batches to send to update. This batches uses the same job that we use for insertion: the app/Jobs/IngestOnixIssue.php.

Farfalla itself will take care of deciding if a publication should be created, updated or removed.

I found that we can increment the batch size of this job a bit more (currently using 50) without running into race conditions or deadlocks.


X

Graph View