Add chunk uploading, begin chunk downloading

pull/152/head
Alex Kern 5 years ago
parent 1ceca18cc0
commit 34c45257f4
No known key found for this signature in database
GPG Key ID: F3141D5EDF48F89F

@ -1,4 +1,4 @@
import React, { useCallback, useEffect, useState } from 'react'
import React, { useCallback, useEffect, useRef, useState } from 'react'
import { useWebRTC } from './WebRTCProvider'
import {
browserName,
@ -9,8 +9,9 @@ import {
mobileModel,
} from 'react-device-detect'
import * as t from 'io-ts'
import { decodeMessage, Message, MessageType } from '../messages'
import { ChunkMessage, decodeMessage, Message, MessageType } from '../messages'
import { createZipStream } from '../zip-stream'
import { DataConnection } from 'peerjs'
const baseURL = process.env.NEXT_PUBLIC_BASE_URL ?? 'http://localhost:3000'
@ -82,6 +83,17 @@ export default function Downloader({
const peer = useWebRTC()
const [password, setPassword] = useState('')
const [dataConnection, setDataConnection] = useState<DataConnection | null>(
null,
)
const [filesInfo, setFilesInfo] = useState<Array<{
fullPath: string
size: number
type: string
}> | null>(null)
const processChunk = useRef<
((message: t.TypeOf<typeof ChunkMessage>) => void) | null
>(null)
const [shouldAttemptConnection, setShouldAttemptConnection] = useState(false)
const [open, setOpen] = useState(false)
const [downloading, setDownloading] = useState(false)
@ -96,6 +108,8 @@ export default function Downloader({
reliable: true,
})
setDataConnection(conn)
conn.on('open', () => {
setOpen(true)
@ -118,7 +132,11 @@ export default function Downloader({
const message = decodeMessage(data)
switch (message.type) {
case MessageType.Info:
console.log(message)
setFilesInfo(message.files)
break
case MessageType.Chunk:
if (processChunk.current) processChunk.current(message)
break
case MessageType.Error:
@ -133,6 +151,7 @@ export default function Downloader({
})
conn.on('close', () => {
setDataConnection(null)
setOpen(false)
setDownloading(false)
setShouldAttemptConnection(false)
@ -158,16 +177,65 @@ export default function Downloader({
const handleStartDownload = useCallback(() => {
setDownloading(true)
// TODO(@kern): Download each file as a ReadableStream
// const blob = new Blob(['support blobs too'])
// const file = {
// name: 'blob-example.txt',
// size: 12,
// stream: () => blob.stream(),
// }
// streamDownloadSingleFile(file)
// streamDownloadMultipleFiles([file])
}, [])
const fileStreams = filesInfo.map((_info) => {
return new ReadableStream({
start(ctrl) {
console.log('START')
console.log(ctrl)
},
async pull(ctrl) {
console.log('PULL')
console.log(ctrl)
},
})
})
const fileStreamByPath: Record<string, ReadableStream> = {}
fileStreams.forEach((stream, i) => {
fileStreamByPath[filesInfo[i].fullPath] = stream
})
const processChunkFunc = (message: t.TypeOf<typeof ChunkMessage>): void => {
const stream = fileStreamByPath[message.fullPath]
if (!stream) {
console.error('no stream found for ' + message.fullPath)
return
}
console.log(stream)
}
processChunk.current = processChunkFunc
const downloads = filesInfo.map((info, i) => ({
name: info.fullPath.replace(/^\//, ''),
size: info.size,
stream: () => fileStreams[i],
}))
let downloadPromise: Promise<void> | null = null
if (downloads.length > 1) {
downloadPromise = streamDownloadMultipleFiles(downloads)
} else if (downloads.length === 1) {
downloadPromise = streamDownloadSingleFile(downloads[0])
} else {
throw new Error('no files to download')
}
downloadPromise
.then(() => {
console.log('DONE')
})
.catch((err) => {
console.error(err)
})
const request: t.TypeOf<typeof Message> = {
type: MessageType.Start,
fullPath: filesInfo[0].fullPath,
offset: 0,
}
dataConnection.send(request)
}, [dataConnection, filesInfo])
if (downloading) {
return <div>Downloading</div>

@ -9,11 +9,11 @@ import * as t from 'io-ts'
enum UploaderConnectionStatus {
Pending = 'PENDING',
Paused = 'PAUSED',
Uploading = 'UPLOADING',
Done = 'DONE',
InvalidPassword = 'INVALID_PASSWORD',
Closed = 'CLOSED',
Paused = 'PAUSED',
}
type UploaderConnection = {
@ -25,9 +25,14 @@ type UploaderConnection = {
osVersion?: string
mobileVendor?: string
mobileModel?: string
uploadingFullPath?: string
uploadingOffset?: number
}
// TODO(@kern): Use better values
const RENEW_INTERVAL = 5000 // 20 minutes
// const MAX_CHUNK_SIZE = 1024 * 1024 // 1 Mi
const MAX_CHUNK_SIZE = 1
function useUploaderChannel(
uploaderPeerID: string,
@ -86,6 +91,20 @@ function useUploaderChannelRenewal(shortSlug: string): void {
}, [shortSlug])
}
function validateOffset(
files: UploadedFile[],
fullPath: string,
offset: number,
): UploadedFile {
const validFile = files.find(
(file) => file.fullPath === fullPath && offset <= file.size,
)
if (!validFile) {
throw new Error('invalid file offset')
}
return validFile
}
function useUploaderConnections(
peer: Peer,
files: UploadedFile[],
@ -95,6 +114,7 @@ function useUploaderConnections(
useEffect(() => {
peer.on('connection', (conn: DataConnection) => {
let sendChunkTimeout: number | null = null
const newConn = {
status: UploaderConnectionStatus.Pending,
dataConnection: conn,
@ -151,7 +171,7 @@ function useUploaderConnections(
return
}
draft.status = UploaderConnectionStatus.Uploading
draft.status = UploaderConnectionStatus.Paused
draft.browserName = message.browserName
draft.browserVersion = message.browserVersion
draft.osName = message.osName
@ -163,6 +183,8 @@ function useUploaderConnections(
const fileInfo = files.map((f) => {
return {
fullPath: f.fullPath,
size: f.size,
type: f.type,
}
})
@ -171,10 +193,65 @@ function useUploaderConnections(
files: fileInfo,
}
conn.send(request)
break
}
case MessageType.Start: {
const fullPath = message.fullPath
let offset = message.offset
const file = validateOffset(files, fullPath, offset)
updateConnection((draft) => {
if (draft.status !== UploaderConnectionStatus.Paused) {
return
}
draft.status = UploaderConnectionStatus.Uploading
draft.uploadingFullPath = fullPath
draft.uploadingOffset = offset
})
const sendNextChunk = () => {
const end = Math.min(file.size, offset + MAX_CHUNK_SIZE)
const chunkSize = end - offset
const request: t.TypeOf<typeof Message> = {
type: MessageType.Chunk,
fullPath,
offset,
bytes: file.slice(offset, end),
}
conn.send(request)
updateConnection((draft) => {
offset = end
draft.uploadingOffset = end
if (chunkSize < MAX_CHUNK_SIZE) {
draft.status = UploaderConnectionStatus.Paused
} else {
sendChunkTimeout = setTimeout(() => {
sendNextChunk()
}, 0)
}
})
}
sendNextChunk()
// TODO(@kern): Handle sending chunks
break
}
case MessageType.Pause: {
updateConnection((draft) => {
if (draft.status !== UploaderConnectionStatus.Uploading) {
return
}
draft.status = UploaderConnectionStatus.Paused
if (sendChunkTimeout) {
clearTimeout(sendChunkTimeout)
sendChunkTimeout = null
}
})
}
}
} catch (err) {
console.error(err)
@ -182,6 +259,10 @@ function useUploaderConnections(
})
conn.on('close', (): void => {
if (sendChunkTimeout) {
clearTimeout(sendChunkTimeout)
}
updateConnection((draft) => {
if (draft.status === UploaderConnectionStatus.InvalidPassword) {
return

@ -5,13 +5,14 @@ import { fold } from 'fp-ts/Either'
export enum MessageType {
RequestInfo = 'REQUEST_INFO',
Info = 'INFO',
Pause = 'PAUSE',
Start = 'START',
Chunk = 'CHUNK',
Pause = 'PAUSE',
Done = 'DONE',
Error = 'ERROR',
}
const RequestInfoMessage = t.type({
export const RequestInfoMessage = t.type({
type: t.literal(MessageType.RequestInfo),
browserName: t.string,
browserVersion: t.string,
@ -22,37 +23,39 @@ const RequestInfoMessage = t.type({
password: t.string,
})
const InfoMessage = t.type({
export const InfoMessage = t.type({
type: t.literal(MessageType.Info),
files: t.array(
t.type({
fullPath: t.string,
size: t.number,
type: t.string,
}),
),
})
const StartMessage = t.type({
export const StartMessage = t.type({
type: t.literal(MessageType.Start),
browserName: t.string,
browserVersion: t.string,
osName: t.string,
osVersion: t.string,
mobileVendor: t.string,
mobileModel: t.string,
password: t.string,
fullPath: t.string,
offset: t.number,
})
const ChunkMessage = t.type({
export const ChunkMessage = t.type({
type: t.literal(MessageType.Chunk),
// TODO(@kern): Chunk
fullPath: t.string,
offset: t.number,
bytes: t.unknown,
})
const PauseMessage = t.type({
export const PauseMessage = t.type({
type: t.literal(MessageType.Pause),
// TODO(@kern): Pausing
})
const ErrorMessage = t.type({
export const DoneMessage = t.type({
type: t.literal(MessageType.Done),
})
export const ErrorMessage = t.type({
type: t.literal(MessageType.Error),
error: t.string,
})
@ -60,9 +63,10 @@ const ErrorMessage = t.type({
export const Message = t.union([
RequestInfoMessage,
InfoMessage,
PauseMessage,
StartMessage,
ChunkMessage,
PauseMessage,
DoneMessage,
ErrorMessage,
])

Loading…
Cancel
Save