diff --git a/constants/index.ts b/constants/index.ts index d835c275..42e3d553 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -295,3 +295,6 @@ export const CLIENT_PAYMENT_EXPIRATION_TIME = (7) * (24 * 60 * 60 * 1000) // (nu // Enough for eCash IFP when created export const MAX_TXS_PER_ADDRESS = 250000 + +// Will look for this many days before to check if there are gaps in prices +export const N_DAYS_LOOK_FOR_PRICE_GAPS = 30 diff --git a/services/chronikService.ts b/services/chronikService.ts index b3e96938..97738e17 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -714,6 +714,7 @@ export class ChronikBlockchainClient { } } } else if (msg.msgType === 'TX_CONFIRMED') { + if (this.isAlreadyBeingProcessed(msg.txid, true)) return try { const transaction = await this.fetchTxWithRetry(msg.txid) const addressesWithTransactions = await this.getAddressesForTransaction(transaction) @@ -731,6 +732,8 @@ export class ChronikBlockchainClient { await markTransactionsOrphaned(msg.txid) } else { console.error(`${this.CHRONIK_MSG_PREFIX}: confirmed tx handler failed for ${msg.txid}`, e) + const { [msg.txid]: _, ...rest } = this.lastProcessedMessages.confirmed + this.lastProcessedMessages.confirmed = rest } } } else if (msg.msgType === 'TX_ADDED_TO_MEMPOOL') { @@ -803,8 +806,9 @@ export class ChronikBlockchainClient { const pageSize = 200 let blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs let blockTxsToSync: Tx[] = [] - while (blockPageTxs.length > 0 && blockTxsToSync.length !== this.confirmedTxsHashesFromLastBlock.length) { - const thisBlockTxsToSync = blockPageTxs.filter(tx => this.confirmedTxsHashesFromLastBlock.includes(tx.txid)) + const confirmedTxHashes = new Set(this.confirmedTxsHashesFromLastBlock) + while (blockPageTxs.length > 0 && blockTxsToSync.length < confirmedTxHashes.size) { + const thisBlockTxsToSync = blockPageTxs.filter(tx => confirmedTxHashes.has(tx.txid)) blockTxsToSync = [...blockTxsToSync, ...thisBlockTxsToSync] page += 1 blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs diff --git a/services/priceService.ts b/services/priceService.ts index b015121c..f9cf319d 100644 --- a/services/priceService.ts +++ b/services/priceService.ts @@ -2,7 +2,7 @@ import axios from 'axios' import { Prisma, Price } from '@prisma/client' import config from 'config' import prisma from 'prisma-local/clientInstance' -import { PRICE_API_TIMEOUT, PRICE_API_MAX_RETRIES, PRICE_API_DATE_FORMAT, RESPONSE_MESSAGES, NETWORK_TICKERS, XEC_NETWORK_ID, BCH_NETWORK_ID, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, HUMAN_READABLE_DATE_FORMAT } from 'constants/index' +import { PRICE_API_TIMEOUT, PRICE_API_MAX_RETRIES, PRICE_API_DATE_FORMAT, RESPONSE_MESSAGES, NETWORK_TICKERS, XEC_NETWORK_ID, BCH_NETWORK_ID, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, N_DAYS_LOOK_FOR_PRICE_GAPS } from 'constants/index' import { validatePriceAPIUrlAndToken, validateNetworkTicker } from 'utils/validators' import moment from 'moment' @@ -154,44 +154,105 @@ export async function getAllPricesByNetworkTicker ( } export async function syncPastDaysNewerPrices (): Promise { - console.log('[PRICES] Syncing prices...') - const lastPrice = await prisma.price.findFirst({ - orderBy: [{ timestamp: 'desc' }], - select: { timestamp: true } + console.log(`[PRICES] Syncing missing prices, including gaps on the last ${N_DAYS_LOOK_FOR_PRICE_GAPS} days...`) + + const today = moment.utc().startOf('day') + const windowStart = moment.utc().subtract(N_DAYS_LOOK_FOR_PRICE_GAPS, 'days').startOf('day') + + const existingPrices = await prisma.price.findMany({ + where: { + timestamp: { + gte: windowStart.unix(), + lte: today.unix() + }, + quoteId: USD_QUOTE_ID + }, + select: { timestamp: true, networkId: true } }) - if (lastPrice === null) throw new Error('No prices found, initial database seed did not complete successfully') - const lastDateInDB = moment.unix(lastPrice.timestamp) - const date = moment().startOf('day') - const daysToRetrieve: string[] = [] + const xecTimestamps = new Set( + existingPrices.filter(p => p.networkId === XEC_NETWORK_ID).map(p => p.timestamp) + ) + const bchTimestamps = new Set( + existingPrices.filter(p => p.networkId === BCH_NETWORK_ID).map(p => p.timestamp) + ) + + const expectedDays: Array<{ formatted: string, timestamp: number }> = [] + const cursor = today.clone() + while (cursor.isSameOrAfter(windowStart)) { + expectedDays.push({ formatted: cursor.format(PRICE_API_DATE_FORMAT), timestamp: cursor.unix() }) + cursor.subtract(1, 'day') + } + + const missingXECDays = expectedDays.filter(d => !xecTimestamps.has(d.timestamp)) + const missingBCHDays = expectedDays.filter(d => !bchTimestamps.has(d.timestamp)) - console.log(`[PRICES] Last price found is for ${lastDateInDB.format(HUMAN_READABLE_DATE_FORMAT)}.`) - while (date.isAfter(lastDateInDB)) { - daysToRetrieve.push(date.format(PRICE_API_DATE_FORMAT)) - date.add(-1, 'day') + const totalMissing = missingXECDays.length + missingBCHDays.length + if (totalMissing === 0) { + console.log(`[PRICES] No missing prices found in the last ${N_DAYS_LOOK_FOR_PRICE_GAPS} days.`) + return } - console.log(`[PRICES] Will try to retrieve ${daysToRetrieve.length} prices.`) - const allXECPrices = await getAllPricesByNetworkTicker(NETWORK_TICKERS.ecash, false) - const allBCHPrices = await getAllPricesByNetworkTicker(NETWORK_TICKERS.bitcoincash, false) + console.log(`[PRICES] Found ${missingXECDays.length} missing XEC days and ${missingBCHDays.length} missing BCH days. Fetching from API...`) + + const failedDays: string[] = [] + + const allXECPrices = missingXECDays.length > 0 ? await getAllPricesByNetworkTicker(NETWORK_TICKERS.ecash, false) : null + const allBCHPrices = missingBCHDays.length > 0 ? await getAllPricesByNetworkTicker(NETWORK_TICKERS.bitcoincash, false) : null + + const xecBulkDays = new Set(allXECPrices?.map(p => p.day) ?? []) + const bchBulkDays = new Set(allBCHPrices?.map(p => p.day) ?? []) if (allXECPrices !== null) { + const missingDaySet = new Set(missingXECDays.map(d => d.formatted)) await Promise.all( allXECPrices - .filter(p => daysToRetrieve.includes(p.day)) - .map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment(price.day).unix())) + .filter(p => missingDaySet.has(p.day)) + .map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment.utc(price.day).unix())) ) } if (allBCHPrices !== null) { + const missingDaySet = new Set(missingBCHDays.map(d => d.formatted)) await Promise.all( allBCHPrices - .filter(p => daysToRetrieve.includes(p.day)) - .map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment(price.day).unix())) + .filter(p => missingDaySet.has(p.day)) + .map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment.utc(price.day).unix())) + ) + } + + const xecStillMissing = missingXECDays.filter(d => !xecBulkDays.has(d.formatted)) + const bchStillMissing = missingBCHDays.filter(d => !bchBulkDays.has(d.formatted)) + + for (const day of xecStillMissing) { + const price = await withRetries( + async () => await getPriceForDayAndNetworkTicker(moment.utc(day.formatted), NETWORK_TICKERS.ecash), + { throwOnFailure: false, context: { day: day.formatted, network: 'XEC' } } ) + if (price !== null) { + await upsertPricesForNetworkId(price, XEC_NETWORK_ID, day.timestamp) + } else { + failedDays.push(`XEC ${day.formatted}`) + } } - console.log('[PRICES] All past prices have been synced.') + for (const day of bchStillMissing) { + const price = await withRetries( + async () => await getPriceForDayAndNetworkTicker(moment.utc(day.formatted), NETWORK_TICKERS.bitcoincash), + { throwOnFailure: false, context: { day: day.formatted, network: 'BCH' } } + ) + if (price !== null) { + await upsertPricesForNetworkId(price, BCH_NETWORK_ID, day.timestamp) + } else { + failedDays.push(`BCH ${day.formatted}`) + } + } + + if (failedDays.length > 0) { + console.warn(`[PRICES] Could not fetch prices for: ${failedDays.join(', ')}`) + } else { + console.log('[PRICES] All missing prices have been synced.') + } } export async function syncCurrentPrices (): Promise {