work in progress. caching moved to sync IO code (simplifies things).

This was SVN commit r1017.
This commit is contained in:
janwas 2004-08-18 02:12:52 +00:00
parent e2970992fd
commit d58e16e0ff
2 changed files with 270 additions and 325 deletions

View File

@ -509,21 +509,145 @@ int file_close(File* const f)
///////////////////////////////////////////////////////////////////////////////
// starts transferring to/from the given buffer.
// no attempt is made at aligning or padding the transfer.
int file_start_io(File* const f, const off_t ofs, size_t size, void* const p, FileIO* io)
{
#ifdef PARANOIA
debug_out("file_start_io ofs=%d size=%d\n", ofs, size);
#endif
//
// check params
//
CHECK_FILE(f);
if(!size || !p || !io)
return ERR_INVALID_PARAM;
const bool is_write = (f->flags & FILE_WRITE) != 0;
// cut off at EOF.
if(!is_write)
{
// avoid min() due to type conversion warnings.
const off_t bytes_left = f->size - ofs;
if(bytes_left < 0)
{
debug_warn("file_start_io: EOF");
return ERR_EOF;
}
if((off_t)size > bytes_left)
size = (size_t)bytes_left;
// guaranteed to fit, since size was > bytes_left
}
// set the "I/O context", a pointer to the (newly allocated) aiocb.
// we can't store the whole aiocb in a struct - glibc's version is
// 144 bytes large. we don't currently need anything else (since this
// code is only a thin aio wrapper); if that changes, instead return a
// pointer to a struct containing the aiocb*.
aiocb* cb = (aiocb*)malloc(sizeof(aiocb));
if(!cb)
return ERR_NO_MEM;
*io = cb;
// send off async read/write request
cb->aio_lio_opcode = is_write? LIO_WRITE : LIO_READ;
cb->aio_buf = p;
cb->aio_fildes = f->fd;
cb->aio_offset = ofs;
cb->aio_nbytes = size;
int err = lio_listio(LIO_NOWAIT, &cb, 1, (struct sigevent*)0);
if(err < 0)
{
file_discard_io(*io);
return err;
}
return 0;
}
// indicates if the IO referenced by <io> has completed.
// return value: 0 if pending, 1 if complete, < 0 on error.
int file_io_complete(FileIO io)
{
aiocb* cb = (aiocb*)io;
int ret = aio_error(cb);
if(ret == EINPROGRESS)
return 0;
if(ret == 0)
return 1;
debug_warn("file_io_complete: unexpected aio_error return");
return -1;
}
int file_wait_io(FileIO io, void*& p, size_t& size)
{
#ifdef PARANOIA
debug_out("file_wait_io: hio=%I64x\n", hio);
#endif
// zero output params in case something (e.g. H_DEREF) fails.
p = 0;
size = 0;
aiocb* cb = (aiocb*)io;
// wait for transfer to complete.
const aiocb** cbs = (const aiocb**)&cb; // pass in an "array"
while(aio_error(cb) == EINPROGRESS)
aio_suspend(cbs, 1, (timespec*)0); // wait indefinitely
// query number of bytes transferred (-1 if the transfer failed)
const ssize_t bytes_transferred = aio_return(cb);
if(bytes_transferred < (ssize_t)cb->aio_nbytes)
return -1;
p = (void*)cb->aio_buf; // cast from volatile void*
size = bytes_transferred;
return 0;
}
int file_discard_io(FileIO io)
{
memset(io, 0, sizeof(aiocb));
// discourage further use.
free(io);
return 0;
}
///////////////////////////////////////////////////////////////////////////////
// L3 cache: intended to cache raw compressed data, since files aren't aligned
// in the archive; alignment code would force a read of the whole block,
// which would be a slowdown unless we keep them in memory.
//
// it's a part of the async code (instead of the sync, block-splitting code)
// because if cached, the IO must not be issued. also, when waiting,
// we can return directly if in the cache.
// keep out of async code (although extra work for sync: must not issue/wait
// if was cached) to simplify things. disadvantage: problems if same block
// is issued twice, before the first call completes (via wait_io).
// that won't happen though unless we have threaded file_ios =>
// rare enough not to worry about performance.
//
// only allow caching for buffers allocated by async read code:
// 1) caller may pull the rug out from under us, freeing its buffer after
// calling file_discard_io. don't want to go to the trouble of getting
// the handle; even worse, buffer may be stack-allocated.
// since sync code allocates the (temp) buffer, it's guaranteed
// to remain valid.
//
// side effect: any async reads into buffers we allocate may be cached
// (if FILE_CACHE_BLOCK specified).
@ -554,280 +678,103 @@ static u64 block_make_id(const u32 fn_hash, const off_t ofs)
}
struct Block
{
FileIO* pending_io;
void* p;
Block()
{
pending_io = 0;
p = 0;
}
};
typedef std::map<u64, Block> BlockCache;
typedef std::map<u64, void*> BlockCache;
typedef BlockCache::iterator BlockIt;
static BlockCache block_cache;
enum FileIOFlags
ssize_t lowio(int fd, bool is_write, off_t ofs, size_t size, void* buf)
{
// coming from cache
// aio_return called
// we allocated buffer
lseek(fd, ofs, SEEK_SET);
if(is_write)
return write(fd, buf, size);
else
return read (fd, buf, size);
}
struct IOSlot
{
FileIO io;
void* temp_buf;
u64 block_id;
// needed so that we can add the block to the cache when
// its IO is complete. if we add it when issuing, we'd no longer be
// thread-safe: someone else might find it in the cache before its
// transfer has completed. don't want to add an "is_complete" flag,
// because that'd be hard to update (on every wait_io).
void* cached_block;
// != 0 <==> data coming from cache and no IO issued.
// given buffer
// given buffer, will copy from cache
// temp buffer allocated here
// temp buffer taken from cache
};
// pads the request up to BLOCK_SIZE, and stores the original parameters in IO.
// transfers of more than 1 block (including padding) are allowed, but do not
// go through the cache. don't see any case where that's necessary, though.
int file_start_io(File* const f, const off_t user_ofs, size_t user_size, void* const user_p, FileIO* io)
// don't just use operator[], so that block_cache isn't cluttered
// with IDs associated with 0 (blocks that wouldn't be cached anyway).
static void* block_find(u64 block_id)
{
int err;
memset(io, 0, sizeof(FileIO));
//
// check params
//
CHECK_FILE(f);
const bool is_write = (f->flags & FILE_WRITE) != 0;
const bool alloc_buf = (user_p == 0);
const bool cache_block = (f->flags & FILE_CACHE_BLOCK) != 0 && alloc_buf;
if(user_size == 0)
{
debug_warn("file_start_io: user_size = 0 - why?");
return ERR_INVALID_PARAM;
}
// cut off at EOF.
if(!is_write)
{
// avoid min() due to type conversion warnings.
const off_t bytes_left = f->size - user_ofs;
if(bytes_left < 0)
{
debug_warn("file_start_io: EOF");
return ERR_EOF;
}
if((off_t)user_size > bytes_left)
user_size = (size_t)bytes_left;
// guaranteed to fit, since size was > bytes_left
}
#ifdef PARANOIA
debug_out("file_start_io hio=%I64x ofs=%d size=%d\n", hio, user_ofs, user_size);
#endif
size_t padding = 0;
size_t size = user_size;
void* buf = user_p;
off_t ofs = user_ofs;
// we're supposed to allocate the buffer
if(alloc_buf)
{
if(is_write)
{
debug_warn("file_start_io: writing but buffer = 0");
return ERR_INVALID_PARAM;
}
// optimization: pad to eliminate a memcpy if unaligned
ofs = user_ofs;
padding = ofs % SECTOR_SIZE;
ofs -= (off_t)padding;
size = round_up(padding + user_size, SECTOR_SIZE);
buf = mem_alloc(size, SECTOR_SIZE);
if(!buf)
return ERR_NO_MEM;
}
// store request params needed by file_wait_io
io->cb = (aiocb*)calloc(sizeof(aiocb), 1);
// must be zeroed! (waio complains about req_ != 0)
if(!io->cb)
{
err = ERR_NO_MEM;
goto fail;
}
io->padding = padding;
io->user_size = user_size;
io->block_id = 0;
io->our_buf = alloc_buf;
// all other members zeroed by memset above.
// note: cb will hold the actual IO request
// (possibly aligned offset and size).
const u64 block_id = block_make_id(f->fn_hash, ofs);
//debug_out("ofs=%x\tid=%I64x", user_ofs, block_id);
// already in cache?
if(cache_block)
{
io->block_id = block_id;
Block& b = block_cache[block_id];
// yes; no need to issue
if(b.p)
{
// debug_out(".. hit\n");
io->from_cache = true;
return 0;
}
io->given_to_cache = true;
// debug_out(".. miss\n");
b.pending_io = io;
b.p = (void*)buf;
}
//else
//debug_out(".. uncacheable => miss\n");
// send off async read/write request
aiocb* cb = io->cb;
cb->aio_lio_opcode = is_write? LIO_WRITE : LIO_READ;
cb->aio_buf = buf;
cb->aio_fildes = f->fd;
cb->aio_offset = ofs;
cb->aio_nbytes = size;
err = lio_listio(LIO_NOWAIT, &cb, 1, (struct sigevent*)0);
if(err < 0)
{
fail:
file_discard_io(io);
if(alloc_buf)
mem_free(buf);
return err;
}
return 0;
}
// indicates if the IO referenced by <io> has completed.
// return value: 0 if pending, 1 if complete, < 0 on error.
int file_io_complete(FileIO* io)
{
int ret = aio_error(io->cb);
if(ret == EINPROGRESS)
BlockIt it = block_cache.find(block_id);
if(it == block_cache.end())
return 0;
if(ret == 0)
return 1;
debug_warn("file_io_complete: unexpected aio_error return");
return -1;
return it->second;
}
int file_wait_io(FileIO* io, void*& p, size_t& size)
static void block_add(u64 block_id, void* block)
{
#ifdef PARANOIA
debug_out("file_wait_io: hio=%I64x\n", hio);
#endif
// zero output params in case something (e.g. H_DEREF) fails.
p = 0;
size = 0;
// aio_return may be called exactly once.
// if user code must be able to call this > 1x, store bytes_transferred
// in FileIO.
if(io->return_called)
{
debug_warn("file_wait_io: already called");
return -1;
}
io->return_called = 1;
aiocb* cb = io->cb;
ssize_t bytes_transferred;
Block* b;
if(io->from_cache || io->given_to_cache)
{
b = &block_cache[io->block_id];
if(b->pending_io)
cb = b->pending_io->cb;
// already finished, no wait necessary
else
{
cb = 0;
p = b->p;
bytes_transferred = BLOCK_SIZE;
}
}
// wait for transfer to complete.
if(cb)
{
const aiocb** cbs = (const aiocb**)&cb; // pass in an "array"
while(aio_error(cb) == EINPROGRESS)
aio_suspend(cbs, 1, (timespec*)0); // wait indefinitely
// query number of bytes transferred (-1 if the transfer failed)
bytes_transferred = aio_return(cb);
p = (void*)cb->aio_buf; // cast from volatile void*
}
// mark block's pending IO as complete
if(io->from_cache || io->given_to_cache)
b->pending_io = 0;
if(bytes_transferred < (ssize_t)io->user_size)
return -1;
size = io->user_size;
// padding optimization: we rounded the start offset down
// to avoid a buffer memcpy in waio. skip past that
(char*&)p += io->padding;
return 0;
if(block_find(block_id))
debug_warn("block_add: already in cache");
else
block_cache[block_id] = block;
}
int file_discard_io(FileIO* io)
static ssize_t block_issue(File* f, IOSlot* slot, const off_t issue_ofs, void* buf)
{
if(!io->return_called)
{
debug_warn("file_discard_io: file_wait_io wasn't called yet");
return -1;
}
memset(slot, 0, sizeof(IOSlot));
if(io->our_buf && !io->given_to_cache)
mem_free(io->cb->aio_buf);
ssize_t issue_size = BLOCK_SIZE;
memset(io->cb, 0, sizeof(aiocb));
free(io->cb);
// check if in cache
slot->block_id = block_make_id(f->fn_hash, issue_ofs);
slot->cached_block = block_find(slot->block_id);
if(slot->cached_block)
goto skip_issue;
memset(io, 0, sizeof(FileIO));
return 0;
//debug_out("%x miss\n", issue_ofs);
// allocate temp buffer
if(!buf)
buf = slot->temp_buf = mem_alloc(BLOCK_SIZE, BLOCK_SIZE);
// if using buffer, set position in it; otherwise, use temp buffer
int err = file_start_io(f, issue_ofs, BLOCK_SIZE, buf, &slot->io);
if(err < 0)
issue_size = (ssize_t)err;
skip_issue:
return issue_size;
}
///////////////////////////////////////////////////////////////////////////////
// transfer modes:
// *p != 0: *p is the source/destination address for the transfer.
// (FILE_MEM_READONLY?)
@ -858,6 +805,10 @@ int file_discard_io(FileIO* io)
// the read into blocks, so aio's buffer won't have to cover the
// whole file.
ssize_t file_io(File* const f, const off_t data_ofs, size_t data_size, void** const p,
const FileIOCB cb, const uintptr_t ctx) // optional
{
@ -923,6 +874,7 @@ invalid:
// only align if we allocate the buffer and in AIO mode
const bool do_align = buf_type != USER && !no_aio;
const bool cache = buf_type == TEMP;
//
@ -966,16 +918,9 @@ invalid:
#endif
// transferring via lowio only.
// skip aio code, use lowio
if(no_aio)
{
lseek(f->fd, data_ofs, SEEK_SET);
if(is_write)
return write(f->fd, data_buf, data_size);
else
return read(f->fd, data_buf, data_size);
}
return lowio(f->fd, is_write, data_ofs, data_size, data_buf);
//
@ -984,7 +929,8 @@ invalid:
//
const int MAX_IOS = 4;
FileIO ios[MAX_IOS] = { 0 };
IOSlot ios[MAX_IOS] = { {0} };
int head = 0;
int tail = 0;
@ -1002,6 +948,7 @@ invalid:
ssize_t err = +1; // loop terminates if <= 0
size_t remaining_blocks = actual_size / BLOCK_SIZE;
for(;;)
{
@ -1009,60 +956,66 @@ invalid:
// start transferring next block.
if(pending_ios < MAX_IOS && !all_issued && err > 0)
{
// calculate issue_size:
// at most, transfer up to the next block boundary.
off_t issue_ofs = (off_t)(actual_ofs + issue_cnt);
size_t issue_size = BLOCK_SIZE;
if(!do_align)
{
const size_t left_in_block = BLOCK_SIZE - (issue_ofs % BLOCK_SIZE);
const size_t total_left = data_size - issue_cnt;
issue_size = MIN(left_in_block, total_left);
}
// get next free IO slot in ring buffer
FileIO* io = &ios[head];
IOSlot* slot = &ios[head];
head = (head + 1) % MAX_IOS;
pending_ios++;
// if using buffer, set position in it; otherwise, 0 (temp)
void* data = (buf_type == TEMP)? 0 : (char*)actual_buf + issue_cnt;
int ret = file_start_io(f, issue_ofs, issue_size, data, io);
if(ret < 0)
err = (ssize_t)ret;
off_t issue_ofs = (off_t)(actual_ofs + issue_cnt);
void* buf = (buf_type == TEMP)? 0 : (char*)actual_buf + issue_cnt;
ssize_t issued = block_issue(f, slot, issue_ofs, buf);
if(issued < 0)
err = issued;
// transfer failed - loop will now terminate after
// waiting for all pending transfers to complete.
issue_cnt += issue_size;
issue_cnt += issued;
if(issue_cnt >= actual_size)
all_issued = true;
}
// IO pending: wait for it to complete, and process it.
else if(pending_ios)
{
FileIO* io = &ios[tail];
IOSlot* slot = &ios[tail];
tail = (tail + 1) % MAX_IOS;
pending_ios--;
void* block;
size_t size;
int ret = file_wait_io(io, block, size);
if(ret < 0)
err = (ssize_t)ret;
void* block = slot->cached_block;
size_t size = BLOCK_SIZE;
// wasn't in cache; it was issued, so wait for it
bool from_cache;
if(block)
from_cache = true;
else
{
from_cache = false;
// first time; skip past padding
void* data = block;
if(raw_transferred_cnt == 0)
{
(char*&)data += lead_padding;
size -= lead_padding;
}
int ret = file_wait_io(slot->io, block, size);
if(ret < 0)
err = (ssize_t)ret;
}
// first time; skip past padding
void* data = block;
if(raw_transferred_cnt == 0)
{
(char*&)data += lead_padding;
size -= lead_padding;
}
// don't include trailing padding
if(raw_transferred_cnt + size > data_size)
size = data_size - raw_transferred_cnt;
// we have useable data from a previous temp buffer,
// but it needs to be copied into the user's buffer
if(from_cache && buf_type != TEMP)
memcpy((char*)data_buf+raw_transferred_cnt, data, size);
// don't include trailing padding
if(raw_transferred_cnt + size > data_size)
size = data_size - raw_transferred_cnt;
//// if size comes out short, we must be at EOF
@ -1085,7 +1038,15 @@ invalid:
else
actual_transferred_cnt += size;
file_discard_io(io);
if(!from_cache)
file_discard_io(slot->io);
if(buf_type == TEMP)
{
// adding is allowed and we didn't take this from the cache already: add
if(!slot->cached_block)
block_add(slot->block_id, slot->temp_buf);
}
}
// (all issued OR error) AND no pending transfers - done.
else

View File

@ -127,33 +127,17 @@ extern int file_unmap(File* f);
// async IO
//
struct FileIO
{
u64 block_id;
// set by file_start_io when in block-cache mode, otherwise 0.
aiocb* cb;
// large (144 bytes) on Linux; cannot store here.
// allocated in file_start_io.
size_t padding;
size_t user_size;
bool our_buf;
bool from_cache;
bool given_to_cache;
bool return_called;
};
typedef void* FileIO;
extern int file_start_io(File* f, off_t ofs, size_t size, void* buf, FileIO* io);
// indicates if the given IO has completed.
// return value: 0 if pending, 1 if complete, < 0 on error.
extern int file_io_complete(FileIO* io);
extern int file_io_complete(FileIO io);
extern int file_wait_io(FileIO* io, void*& p, size_t& size);
extern int file_wait_io(FileIO io, void*& p, size_t& size);
extern int file_discard_io(FileIO* io);
extern int file_discard_io(FileIO io);