From ba9d0ec73971eba01f7678b1244231c39f06c6f0 Mon Sep 17 00:00:00 2001 From: Alex Kern Date: Thu, 26 Dec 2024 23:10:07 -0800 Subject: [PATCH] more progress --- src/app/api/answer/route.ts | 38 ++++++++++++ src/app/api/create/route.ts | 2 +- src/app/api/destroy/route.ts | 2 +- src/app/api/offer/route.ts | 4 +- src/app/api/renew/route.ts | 2 +- src/app/download/[...slug]/page.tsx | 2 +- src/channel.ts | 96 +++++++++++++++++++++-------- src/hooks/useDownloader.ts | 67 +++++++++++++++++++- src/hooks/useUploaderChannel.ts | 29 ++++++--- 9 files changed, 200 insertions(+), 42 deletions(-) create mode 100644 src/app/api/answer/route.ts diff --git a/src/app/api/answer/route.ts b/src/app/api/answer/route.ts new file mode 100644 index 0000000..415c310 --- /dev/null +++ b/src/app/api/answer/route.ts @@ -0,0 +1,38 @@ +import { NextRequest, NextResponse } from 'next/server' +import { channelRepo } from '../../../channel' + +export async function POST(request: NextRequest): Promise { + const { slug, offerID, answer } = await request.json() + + if (!slug) { + return NextResponse.json({ error: 'Slug is required' }, { status: 400 }) + } + + if (!offerID) { + return NextResponse.json({ error: 'Offer ID is required' }, { status: 400 }) + } + + if (!answer) { + return NextResponse.json({ error: 'Answer is required' }, { status: 400 }) + } + + const success = await channelRepo.answer(slug, offerID, answer) + return NextResponse.json({ success }) +} + +export async function GET(request: NextRequest): Promise { + const { searchParams } = new URL(request.url) + const slug = searchParams.get('slug') + const offerID = searchParams.get('offerID') + + if (!slug) { + return NextResponse.json({ error: 'Slug is required' }, { status: 400 }) + } + + if (!offerID) { + return NextResponse.json({ error: 'Offer ID is required' }, { status: 400 }) + } + + const answer = await channelRepo.fetchAnswer(slug, offerID) + return NextResponse.json({ answer }) +} diff --git a/src/app/api/create/route.ts b/src/app/api/create/route.ts index d160e89..7ebfc9b 100644 --- a/src/app/api/create/route.ts +++ b/src/app/api/create/route.ts @@ -2,6 +2,6 @@ import { NextResponse } from 'next/server' import { Channel, channelRepo } from '../../../channel' export async function POST(): Promise { - const channel: Channel = await channelRepo.create() + const channel: Channel = await channelRepo.createChannel() return NextResponse.json(channel) } diff --git a/src/app/api/destroy/route.ts b/src/app/api/destroy/route.ts index a3a240d..ed08bbe 100644 --- a/src/app/api/destroy/route.ts +++ b/src/app/api/destroy/route.ts @@ -13,7 +13,7 @@ export async function POST(request: NextRequest): Promise { } try { - await channelRepo.destroy(slug, secret) + await channelRepo.destroyChannel(slug, secret) return NextResponse.json({ success: true }, { status: 200 }) } catch (error) { return NextResponse.json( diff --git a/src/app/api/offer/route.ts b/src/app/api/offer/route.ts index 902271c..aafd05e 100644 --- a/src/app/api/offer/route.ts +++ b/src/app/api/offer/route.ts @@ -12,6 +12,6 @@ export async function POST(request: NextRequest): Promise { return NextResponse.json({ error: 'Offer is required' }, { status: 400 }) } - await channelRepo.offer(slug, offer) - return NextResponse.json({ success: true }) + const offerID = await channelRepo.offer(slug, offer) + return NextResponse.json({ offerID }) } diff --git a/src/app/api/renew/route.ts b/src/app/api/renew/route.ts index 58ffd0b..bdb18cf 100644 --- a/src/app/api/renew/route.ts +++ b/src/app/api/renew/route.ts @@ -12,6 +12,6 @@ export async function POST(request: NextRequest): Promise { return NextResponse.json({ error: 'Secret is required' }, { status: 400 }) } - const offers = await channelRepo.renew(slug, secret) + const offers = await channelRepo.renewChannel(slug, secret) return NextResponse.json({ success: true, offers }) } diff --git a/src/app/download/[...slug]/page.tsx b/src/app/download/[...slug]/page.tsx index 8a8cad1..d8818f5 100644 --- a/src/app/download/[...slug]/page.tsx +++ b/src/app/download/[...slug]/page.tsx @@ -19,7 +19,7 @@ export default async function DownloadPage({ params: { slug: string[] } }): Promise { const slug = normalizeSlug(params.slug) - const channel = await channelRepo.fetch(slug) + const channel = await channelRepo.fetchChannel(slug) if (!channel) { notFound() diff --git a/src/channel.ts b/src/channel.ts index def7cc2..50d3d8f 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -18,14 +18,26 @@ const ChannelSchema = z.object({ }) export interface ChannelRepo { - create(ttl?: number): Promise - fetch(slug: string): Promise - renew( + createChannel(ttl?: number): Promise + fetchChannel(slug: string): Promise + renewChannel( slug: string, secret: string, ttl: number, - ): Promise - destroy(slug: string, secret: string): Promise + ): Promise> + destroyChannel(slug: string, secret: string): Promise + offer( + slug: string, + offer: RTCSessionDescriptionInit, + ttl: number, + ): Promise + answer( + slug: string, + offerID: string, + answer: RTCSessionDescriptionInit, + ttl: number, + ): Promise + fetchAnswer(slug: string, offerID: string): Promise } export class RedisChannelRepo implements ChannelRepo { @@ -35,7 +47,7 @@ export class RedisChannelRepo implements ChannelRepo { this.client = new Redis(redisURL) } - async create(ttl: number = config.channel.ttl): Promise { + async createChannel(ttl: number = config.channel.ttl): Promise { const shortSlug = await this.generateShortSlug() const longSlug = await this.generateLongSlug() @@ -52,7 +64,7 @@ export class RedisChannelRepo implements ChannelRepo { return channel } - async fetch(slug: string, scrubSecret = false): Promise { + async fetchChannel(slug: string, scrubSecret = false): Promise { const shortChannelStr = await this.client.get(this.getShortSlugKey(slug)) if (shortChannelStr) { return this.deserializeChannel(shortChannelStr, scrubSecret) @@ -66,32 +78,28 @@ export class RedisChannelRepo implements ChannelRepo { return null } - async renew( + async renewChannel( slug: string, secret: string, ttl: number = config.channel.ttl, - ): Promise { - const channel = await this.fetch(slug) + ): Promise> { + const channel = await this.fetchChannel(slug) if (!channel || channel.secret !== secret) { - return [] + return {} } await this.client.expire(this.getLongSlugKey(channel.longSlug), ttl) await this.client.expire(this.getShortSlugKey(channel.shortSlug), ttl) const offerKey = this.getOfferKey(channel.shortSlug) - const offers = await this.client.lrange(offerKey, 0, -1) - if (offers.length > 0) { - return offers.map((offer) => - JSON.parse(offer), - ) as RTCSessionDescriptionInit[] - } - - return [] + const offers = await this.client.hgetall(offerKey) + return Object.fromEntries( + Object.entries(offers).map(([offerID, offer]) => [offerID, JSON.parse(offer)]), + ) as Record } - async destroy(slug: string, secret: string): Promise { - const channel = await this.fetch(slug) + async destroyChannel(slug: string, secret: string): Promise { + const channel = await this.fetchChannel(slug) if (!channel || channel.secret !== secret) { return } @@ -104,18 +112,48 @@ export class RedisChannelRepo implements ChannelRepo { slug: string, offer: RTCSessionDescriptionInit, ttl: number = config.channel.ttl, - ): Promise { - const channel = await this.fetch(slug) + ): Promise { + const channel = await this.fetchChannel(slug) if (!channel) { - return + return '' } + const offerID = crypto.randomUUID() const offerKey = this.getOfferKey(channel.shortSlug) - await this.client.rpush(offerKey, JSON.stringify(offer)) + await this.client.hset(offerKey, offerID, JSON.stringify(offer)) await this.client.expire(offerKey, ttl) - await this.client.expire(this.getLongSlugKey(channel.longSlug), ttl) - await this.client.expire(this.getShortSlugKey(channel.shortSlug), ttl) + return offerID + } + + async answer( + slug: string, + offerID: string, + answer: RTCSessionDescriptionInit, + ttl: number = config.channel.ttl, + ): Promise { + const channel = await this.fetchChannel(slug) + if (!channel) { + return false + } + + const answerKey = this.getAnswerKey(channel.shortSlug, offerID) + await this.client.setex(answerKey, ttl, JSON.stringify(answer)) + + const offerKey = this.getOfferKey(channel.shortSlug) + await this.client.hdel(offerKey, offerID) + + return true + } + + async fetchAnswer(slug: string, offerID: string): Promise { + const answerKey = this.getAnswerKey(slug, offerID) + const answer = await this.client.get(answerKey) + if (answer) { + return JSON.parse(answer) as RTCSessionDescriptionInit + } + + return null } private async generateShortSlug(): Promise { @@ -154,6 +192,10 @@ export class RedisChannelRepo implements ChannelRepo { return `offers:${shortSlug}` } + private getAnswerKey(shortSlug: string, offerID: string): string { + return `answers:${shortSlug}:${offerID}` + } + private serializeChannel(channel: Channel): string { return JSON.stringify(channel) } diff --git a/src/hooks/useDownloader.ts b/src/hooks/useDownloader.ts index 23cb09c..7bab3f8 100644 --- a/src/hooks/useDownloader.ts +++ b/src/hooks/useDownloader.ts @@ -15,7 +15,7 @@ import { mobileVendor, mobileModel, } from 'react-device-detect' -import { useQuery } from '@tanstack/react-query' +import { useQuery, useMutation } from '@tanstack/react-query' const cleanErrorMessage = (errorMessage: string): string => errorMessage.startsWith('Could not connect to peer') ? 'Could not connect to the uploader. Did they close their browser?' @@ -72,7 +72,7 @@ export function useDownloader(slug: string): { throw new Error('Could not offer connection to uploader') } const data = await response.json() - return data.success + return { offerID: data.offerID, offer } }, refetchOnWindowFocus: false, refetchOnMount: false, @@ -80,6 +80,69 @@ export function useDownloader(slug: string): { staleTime: Infinity, }) + const answerCheckMutation = useMutation({ + mutationFn: async (body: { slug: string, offerID: string }) => { + const response = await fetch(`/api/answer?slug=${body.slug}&offerID=${body.offerID}`, { + method: 'GET', + headers: { 'Content-Type': 'application/json' }, + }) + if (!response.ok) { + throw new Error('Network response was not ok') + } + return response.json() + } + }) + + useEffect(() => { + if (!offerData || isConnected) return + + let timeout: NodeJS.Timeout | null = null + + const run = (): void => { + timeout = setTimeout(() => { + console.log('Checking for answer', offerData) + answerCheckMutation.mutate( + { slug, offerID: offerData?.offerID }, + { + onSuccess: (data) => { + if (data.answer) { + console.log('Answer check success', data) + setIsConnected(true) + if (timeout) clearTimeout(timeout) + + peer.setRemoteDescription(data.answer).then(() => { + peer.addIceCandidate() + const conn = peer.createDataChannel('download') + conn.onopen = () => { + console.log('Connection opened') + } + conn.onerror = (e) => { + console.error('Error setting remote description', e) + } + conn.onclose = () => { + console.log('Connection closed') + } + }).catch((e) => { + console.error('Error setting remote description', e) + }) + } + }, + onError: (e) => { + console.error('Error checking for answer', e) + }, + }, + ) + run() + }, 1000) + } + + run() + + return () => { + if (timeout) clearTimeout(timeout) + } + }, [offerData, isConnected]) + useEffect(() => { return // const conn = peer.connect(slug, { reliable: true }) diff --git a/src/hooks/useUploaderChannel.ts b/src/hooks/useUploaderChannel.ts index 9f4a44c..8d78c2f 100644 --- a/src/hooks/useUploaderChannel.ts +++ b/src/hooks/useUploaderChannel.ts @@ -63,7 +63,19 @@ export function useUploaderChannel( }, }) - // TODO(@kern): add a way to post an answer back to the client + const answerMutation = useMutation({ + mutationFn: async ({ offerID, answer }: { offerID: string, answer: RTCSessionDescriptionInit }) => { + const response = await fetch('/api/answer', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ slug: shortSlug, offerID, answer }), + }) + if (!response.ok) { + throw new Error('Network response was not ok') + } + return response.json() + }, + }) useEffect(() => { if (!secret || !shortSlug) return @@ -76,13 +88,16 @@ export function useUploaderChannel( { secret }, { onSuccess: (d) => { - d.offers.forEach(async (offer) => { + Object.entries(d.offers).forEach(async ([offerID, offer]) => { try { - const answer = await peer.createAnswer(offer) + const answer = await peer.createAnswer(offer as RTCSessionDescriptionInit) + peer.onDataChannel((channel) => { + console.log('Data channel opened', channel) + }) console.log('Created answer:', answer) - // TODO: Send this answer back to the client - } catch (error) { - console.error('Error creating answer:', error) + answerMutation.mutate({ offerID, answer }) + } catch (e) { + console.error('Error creating answer:', e) } }) }, @@ -97,7 +112,7 @@ export function useUploaderChannel( return () => { if (timeout) clearTimeout(timeout) } - }, [secret, shortSlug, renewMutation, renewInterval]) + }, [secret, shortSlug, renewMutation, answerMutation, renewInterval]) return { isLoading,