Transaction Importer
A robust and scalable solution for importing affiliate transaction data.
Homepage
Transaction Importer
This project implements a recurring data pipeline that imports affiliate transaction data from Digidip, a network aggregating over 70,000 merchants. On a fixed schedule, the system retrieves, parses, and persists timestamped transaction data via the Digidip API, linking each entry to the appropriate merchant and capturing key financial metrics. The architecture cleanly separates scheduling, API access, parsing, and persistence to ensure robustness and maintainability.
Implementation
Schedules and executes the recurring task.
1/**
2 * Task class responsible for scheduling and executing the recurring job that imports
3 * transaction data from the Digidip API. This class delegates the actual processing to the
4 * DigidipTransactionTaskExecutor, which handles the import logic using the persisted task data.
5 */
6
7@Component
8class DigidipTransactionImporterTask(
9 private val dtte: DigidipTransactionTaskExecutor,
10) {
11
12 companion object {
13 private val DIGIDIP_TRANSACTION_IMPORTER_TASK = TaskDescriptor.of(
14 "digidip-transaction-importer-task",
15 ScheduleAndTimestampOffset::class.java,
16 )
17 }
18
19 /**
20 * Ensures that the recurring task for importing transactions is scheduled.
21 * This method is triggered when the application context is refreshed. If the task does not exist,
22 * it is scheduled with initial data, including a starting schedule and a flag indicating no prior data.
23 */
24 @EventListener
25 fun setupRecurringTasks(event: ContextRefreshedEvent) {
26 val schedulerClient = event.applicationContext.getBean(SchedulerClient::class.java)
27
28 val immediately = Schedules.fixedDelay(Duration.ofSeconds(0))
29
30 schedulerClient.scheduleIfNotExists(
31 DIGIDIP_TRANSACTION_IMPORTER_TASK.instance("default").data(
32 ScheduleAndTimestampOffset(
33 immediately, NO_INITIAL_DATA
34 )
35 ).scheduledAccordingToData()
36 )
37 }
38
39 /**
40 * Defines a recurring task that triggers the import of transactions from the Digidip API.
41 *
42 * This task is configured with a persistent schedule, meaning its execution state (including
43 * any scheduling metadata) is stored in the database. Each time the task is executed, it delegates
44 * the import logic to the DigidipTransactionTaskExecutor using the task's persisted data.
45 *
46 * @return A RecurringTaskWithPersistentSchedule object that manages the execution of the import job.
47 */
48 @Bean("digidipTransactionImporterTaskBean")
49 fun digidipTransactionImporterTask(): RecurringTaskWithPersistentSchedule<ScheduleAndTimestampOffset>? {
50 return Tasks.recurringWithPersistentSchedule(
51 DIGIDIP_TRANSACTION_IMPORTER_TASK
52 ).executeStateful { taskInstance, _ ->
53 dtte.executeTask(taskInstance.data, DIGIDIP_NETWORK_NAME)
54 }
55 }
56
57
58}
59