Add error handling to streaming

This commit is contained in:
William Oldham
2023-12-19 17:43:36 +00:00
parent 6377a9ccaa
commit 207cdbb220

View File

@@ -88,29 +88,40 @@ app.get('/scrape', async (context) => {
} }
return streamSSE(context, async (stream) => { return streamSSE(context, async (stream) => {
const output = await providers.runAll({ try {
media, const output = await providers.runAll({
events: { media,
discoverEmbeds(evt) { events: {
writeSSEEvent(stream, 'discoverEmbeds', evt); discoverEmbeds(evt) {
writeSSEEvent(stream, 'discoverEmbeds', evt);
},
init(evt) {
writeSSEEvent(stream, 'init', evt);
},
start(evt) {
writeSSEEvent(stream, 'start', evt);
},
update(evt) {
writeSSEEvent(stream, 'update', evt);
},
}, },
init(evt) { });
writeSSEEvent(stream, 'init', evt);
},
start(evt) {
writeSSEEvent(stream, 'start', evt);
},
update(evt) {
writeSSEEvent(stream, 'update', evt);
},
},
});
if (output) { if (output) {
return await writeSSEEvent(stream, 'completed', output); await writeSSEEvent(stream, 'completed', output);
return await stream.close();
}
await writeSSEEvent(stream, 'noOutput', '');
return await stream.close();
} catch (e: any) {
await writeSSEEvent(stream, 'error', {
name: e.name,
message: e.message,
stack: e.stack,
});
return await stream.close();
} }
return await writeSSEEvent(stream, 'noOutput', '');
}); });
}); });
@@ -145,25 +156,36 @@ app.get('/scrape/embed', async (context) => {
} }
return streamSSE(context, async (stream) => { return streamSSE(context, async (stream) => {
const output = await providers.runEmbedScraper({ try {
id: embedInput.id, const output = await providers.runEmbedScraper({
url: embedInput.url, id: embedInput.id,
events: { url: embedInput.url,
update(evt) { events: {
writeSSEEvent(stream, 'update', evt); update(evt) {
writeSSEEvent(stream, 'update', evt);
},
}, },
}, });
});
if (output) { if (output) {
return await writeSSEEvent(stream, 'completed', output); await writeSSEEvent(stream, 'completed', output);
return await stream.close();
}
await writeSSEEvent(stream, 'noOutput', '');
return await stream.close();
} catch (e: any) {
await writeSSEEvent(stream, 'error', {
name: e.name,
message: e.message,
stack: e.stack,
});
return await stream.close();
} }
return await writeSSEEvent(stream, 'noOutput', '');
}); });
}); });
app.get('/scrape/embed', async (context) => { app.get('/scrape/source', async (context) => {
const queryParams = context.req.query(); const queryParams = context.req.query();
const turnstileEnabled = Boolean(context.env?.TURNSTILE_ENABLED); const turnstileEnabled = Boolean(context.env?.TURNSTILE_ENABLED);
@@ -194,21 +216,32 @@ app.get('/scrape/embed', async (context) => {
} }
return streamSSE(context, async (stream) => { return streamSSE(context, async (stream) => {
const output = await providers.runSourceScraper({ try {
id: sourceInput.id, const output = await providers.runSourceScraper({
media: sourceInput, id: sourceInput.id,
events: { media: sourceInput,
update(evt) { events: {
writeSSEEvent(stream, 'update', evt); update(evt) {
writeSSEEvent(stream, 'update', evt);
},
}, },
}, });
});
if (output) { if (output) {
return await writeSSEEvent(stream, 'completed', output); await writeSSEEvent(stream, 'completed', output);
return await stream.close();
}
await writeSSEEvent(stream, 'noOutput', '');
return await stream.close();
} catch (e: any) {
await writeSSEEvent(stream, 'error', {
name: e.name,
message: e.message,
stack: e.stack,
});
return await stream.close();
} }
return await writeSSEEvent(stream, 'noOutput', '');
}); });
}); });