blob: 42984021ec9e808e39df9aa33c1faf23a169a4d9 [file] [log] [blame]
/*
* FIO engines for DDN's Infinite Memory Engine.
* This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
*
* Copyright (C) 2018 DataDirect Networks. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License,
* version 2 as published by the Free Software Foundation..
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
/*
* Some details about the new engines are given below:
*
*
* ime_psync:
* Most basic engine that issues calls to ime_native whenever an IO is queued.
*
* ime_psyncv:
* This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
* iodepth_batch). It refuses to queue when the iovecs can't be appended, and
* waits for FIO to issue a commit. After a call to commit and get_events, new
* IOs can be queued.
*
* ime_aio:
* This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
* iodepth_batch). When the iovecs can't be appended to the current request, a
* new request for IME is created. These requests will be issued to IME when
* commit is called. Contrary to ime_psyncv, there can be several requests at
* once. We don't need to wait for a request to terminate before creating a new
* one.
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <linux/limits.h>
#include <ime_native.h>
#include "../fio.h"
/**************************************************************
* Types and constants definitions
*
**************************************************************/
/* define constants for async IOs */
#define FIO_IME_IN_PROGRESS -1
#define FIO_IME_REQ_ERROR -2
/* This flag is used when some jobs were created using threads. In that
case, IME can't be finalized in the engine-specific cleanup function,
because other threads might still use IME. Instead, IME is finalized
in the destructor (see fio_ime_unregister), only when the flag
fio_ime_is_initialized is true (which means at least one thread has
initialized IME). */
static bool fio_ime_is_initialized = false;
struct imesio_req {
int fd; /* File descriptor */
enum fio_ddir ddir; /* Type of IO (read or write) */
off_t offset; /* File offset */
};
struct imeaio_req {
struct ime_aiocb iocb; /* IME aio request */
ssize_t status; /* Status of the IME request */
enum fio_ddir ddir; /* Type of IO (read or write) */
pthread_cond_t cond_endio; /* Condition var to notify FIO */
pthread_mutex_t status_mutex; /* Mutex for cond_endio */
};
/* This structure will be used for 2 engines: ime_psyncv and ime_aio */
struct ime_data {
union {
struct imeaio_req *aioreqs; /* array of aio requests */
struct imesio_req *sioreq; /* pointer to the only syncio request */
};
struct iovec *iovecs; /* array of queued iovecs */
struct io_u **io_us; /* array of queued io_u pointers */
struct io_u **event_io_us; /* array of the events retieved afer get_events*/
unsigned int queued; /* iovecs/io_us in the queue */
unsigned int events; /* number of committed iovecs/io_us */
/* variables used to implement a "ring" queue */
unsigned int depth; /* max entries in the queue */
unsigned int head; /* index used to append */
unsigned int tail; /* index used to pop */
unsigned int cur_commit; /* index of the first uncommitted req */
/* offset used by the last iovec (used to check if the iovecs can be appended)*/
unsigned long long last_offset;
/* The variables below are used for aio only */
struct imeaio_req *last_req; /* last request awaiting committing */
};
/**************************************************************
* Private functions for queueing/unqueueing
*
**************************************************************/
static void fio_ime_queue_incr (struct ime_data *ime_d)
{
ime_d->head = (ime_d->head + 1) % ime_d->depth;
ime_d->queued++;
}
static void fio_ime_queue_red (struct ime_data *ime_d)
{
ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
ime_d->queued--;
ime_d->events--;
}
static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
{
ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
ime_d->events += iovcnt;
}
static void fio_ime_queue_reset (struct ime_data *ime_d)
{
ime_d->head = 0;
ime_d->tail = 0;
ime_d->cur_commit = 0;
ime_d->queued = 0;
ime_d->events = 0;
}
/**************************************************************
* General IME functions
* (needed for both sync and async IOs)
**************************************************************/
static char *fio_set_ime_filename(char* filename)
{
static __thread char ime_filename[PATH_MAX];
int ret;
ret = snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename);
if (ret < PATH_MAX)
return ime_filename;
return NULL;
}
static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
{
struct stat buf;
int ret;
char *ime_filename;
dprint(FD_FILE, "get file size %s\n", f->file_name);
ime_filename = fio_set_ime_filename(f->file_name);
if (ime_filename == NULL)
return 1;
ret = ime_native_stat(ime_filename, &buf);
if (ret == -1) {
td_verror(td, errno, "fstat");
return 1;
}
f->real_file_size = buf.st_size;
return 0;
}
/* This functions mimics the generic_file_open function, but issues
IME native calls instead of POSIX calls. */
static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
{
int flags = 0;
int ret;
uint64_t desired_fs;
char *ime_filename;
dprint(FD_FILE, "fd open %s\n", f->file_name);
if (td_trim(td)) {
td_verror(td, EINVAL, "IME does not support TRIM operation");
return 1;
}
if (td->o.oatomic) {
td_verror(td, EINVAL, "IME does not support atomic IO");
return 1;
}
if (td->o.odirect)
flags |= O_DIRECT;
if (td->o.sync_io)
flags |= O_SYNC;
if (td->o.create_on_open && td->o.allow_create)
flags |= O_CREAT;
if (td_write(td)) {
if (!read_only)
flags |= O_RDWR;
if (td->o.allow_create)
flags |= O_CREAT;
} else if (td_read(td)) {
flags |= O_RDONLY;
} else {
/* We should never go here. */
td_verror(td, EINVAL, "Unsopported open mode");
return 1;
}
ime_filename = fio_set_ime_filename(f->file_name);
if (ime_filename == NULL)
return 1;
f->fd = ime_native_open(ime_filename, flags, 0600);
if (f->fd == -1) {
char buf[FIO_VERROR_SIZE];
int __e = errno;
snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
td_verror(td, __e, buf);
return 1;
}
/* Now we need to make sure the real file size is sufficient for FIO
to do its things. This is normally done before the file open function
is called, but because FIO would use POSIX calls, we need to do it
ourselves */
ret = fio_ime_get_file_size(td, f);
if (ret < 0) {
ime_native_close(f->fd);
td_verror(td, errno, "ime_get_file_size");
return 1;
}
desired_fs = f->io_size + f->file_offset;
if (td_write(td)) {
dprint(FD_FILE, "Laying out file %s%s\n",
DEFAULT_IME_FILE_PREFIX, f->file_name);
if (!td->o.create_on_open &&
f->real_file_size < desired_fs &&
ime_native_ftruncate(f->fd, desired_fs) < 0) {
ime_native_close(f->fd);
td_verror(td, errno, "ime_native_ftruncate");
return 1;
}
if (f->real_file_size < desired_fs)
f->real_file_size = desired_fs;
} else if (td_read(td) && f->real_file_size < desired_fs) {
ime_native_close(f->fd);
log_err("error: can't read %lu bytes from file with "
"%lu bytes\n", desired_fs, f->real_file_size);
return 1;
}
return 0;
}
static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
{
int ret = 0;
dprint(FD_FILE, "fd close %s\n", f->file_name);
if (ime_native_close(f->fd) < 0)
ret = errno;
f->fd = -1;
return ret;
}
static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
{
char *ime_filename = fio_set_ime_filename(f->file_name);
int ret;
if (ime_filename == NULL)
return 1;
ret = unlink(ime_filename);
return ret < 0 ? errno : 0;
}
static struct io_u *fio_ime_event(struct thread_data *td, int event)
{
struct ime_data *ime_d = td->io_ops_data;
return ime_d->event_io_us[event];
}
/* Setup file used to replace get_file_sizes when settin up the file.
Instead we will set real_file_sie to 0 for each file. This way we
can avoid calling ime_native_init before the forks are created. */
static int fio_ime_setup(struct thread_data *td)
{
struct fio_file *f;
unsigned int i;
for_each_file(td, f, i) {
dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
f, i, f->file_name);
f->real_file_size = 0;
}
return 0;
}
static int fio_ime_engine_init(struct thread_data *td)
{
struct fio_file *f;
unsigned int i;
dprint(FD_IO, "ime engine init\n");
if (fio_ime_is_initialized && !td->o.use_thread) {
log_err("Warning: something might go wrong. Not all threads/forks were"
" created before the FIO jobs were initialized.\n");
}
ime_native_init();
fio_ime_is_initialized = true;
/* We have to temporarily set real_file_size so that
FIO can initialize properly. It will be corrected
on file open. */
for_each_file(td, f, i)
f->real_file_size = f->io_size + f->file_offset;
return 0;
}
static void fio_ime_engine_finalize(struct thread_data *td)
{
/* Only finalize IME when using forks */
if (!td->o.use_thread) {
if (ime_native_finalize() < 0)
log_err("error in ime_native_finalize\n");
fio_ime_is_initialized = false;
}
}
/**************************************************************
* Private functions for blocking IOs
* (without iovecs)
**************************************************************/
/* Notice: this function comes from the sync engine */
/* It is used by the commit function to return a proper code and fill
some attributes in the io_u used for the IO. */
static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
{
if (ret != (ssize_t) io_u->xfer_buflen) {
if (ret >= 0) {
io_u->resid = io_u->xfer_buflen - ret;
io_u->error = 0;
return FIO_Q_COMPLETED;
} else
io_u->error = errno;
}
if (io_u->error) {
io_u_log_error(td, io_u);
td_verror(td, io_u->error, "xfer");
}
return FIO_Q_COMPLETED;
}
static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
struct io_u *io_u)
{
struct fio_file *f = io_u->file;
ssize_t ret;
fio_ro_check(td, io_u);
if (io_u->ddir == DDIR_READ)
ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
else if (io_u->ddir == DDIR_WRITE)
ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
else if (io_u->ddir == DDIR_SYNC)
ret = ime_native_fsync(f->fd);
else {
ret = io_u->xfer_buflen;
io_u->error = EINVAL;
}
return fio_ime_psync_end(td, io_u, ret);
}
/**************************************************************
* Private functions for blocking IOs
* (with iovecs)
**************************************************************/
static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
{
/* We can only queue if:
- There are no queued iovecs
- Or if there is at least one:
- There must be no event waiting for retrieval
- The offsets must be contiguous
- The ddir and fd must be the same */
return (ime_d->queued == 0 || (
ime_d->events == 0 &&
ime_d->last_offset == io_u->offset &&
ime_d->sioreq->ddir == io_u->ddir &&
ime_d->sioreq->fd == io_u->file->fd));
}
/* Before using this function, we should have already
ensured that the queue is not full */
static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
{
struct imesio_req *ioreq = ime_d->sioreq;
struct iovec *iov = &ime_d->iovecs[ime_d->head];
iov->iov_base = io_u->xfer_buf;
iov->iov_len = io_u->xfer_buflen;
if (ime_d->queued == 0) {
ioreq->offset = io_u->offset;
ioreq->ddir = io_u->ddir;
ioreq->fd = io_u->file->fd;
}
ime_d->io_us[ime_d->head] = io_u;
ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
fio_ime_queue_incr(ime_d);
}
/* Tries to queue an IO. It will fail if the IO can't be appended to the
current request or if the current request has been committed but not
yet retrieved by get_events. */
static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
struct io_u *io_u)
{
struct ime_data *ime_d = td->io_ops_data;
fio_ro_check(td, io_u);
if (ime_d->queued == ime_d->depth)
return FIO_Q_BUSY;
if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
if (!fio_ime_psyncv_can_queue(ime_d, io_u))
return FIO_Q_BUSY;
dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
io_u->ddir, ime_d->head, ime_d->cur_commit,
ime_d->queued, ime_d->events);
fio_ime_psyncv_enqueue(ime_d, io_u);
return FIO_Q_QUEUED;
} else if (io_u->ddir == DDIR_SYNC) {
if (ime_native_fsync(io_u->file->fd) < 0) {
io_u->error = errno;
td_verror(td, io_u->error, "fsync");
}
return FIO_Q_COMPLETED;
} else {
io_u->error = EINVAL;
td_verror(td, io_u->error, "wrong ddir");
return FIO_Q_COMPLETED;
}
}
/* Notice: this function comes from the sync engine */
/* It is used by the commit function to return a proper code and fill
some attributes in the io_us appended to the current request. */
static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
{
struct ime_data *ime_d = td->io_ops_data;
struct io_u *io_u;
unsigned int i;
int err = errno;
for (i = 0; i < ime_d->queued; i++) {
io_u = ime_d->io_us[i];
if (bytes == -1)
io_u->error = err;
else {
unsigned int this_io;
this_io = bytes;
if (this_io > io_u->xfer_buflen)
this_io = io_u->xfer_buflen;
io_u->resid = io_u->xfer_buflen - this_io;
io_u->error = 0;
bytes -= this_io;
}
}
if (bytes == -1) {
td_verror(td, err, "xfer psyncv");
return -err;
}
return 0;
}
/* Commits the current request by calling ime_native (with one or several
iovecs). After this commit, the corresponding events (one per iovec)
can be retrieved by get_events. */
static int fio_ime_psyncv_commit(struct thread_data *td)
{
struct ime_data *ime_d = td->io_ops_data;
struct imesio_req *ioreq;
int ret = 0;
/* Exit if there are no (new) events to commit
or if the previous committed event haven't been retrieved */
if (!ime_d->queued || ime_d->events)
return 0;
ioreq = ime_d->sioreq;
ime_d->events = ime_d->queued;
if (ioreq->ddir == DDIR_READ)
ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
else
ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
return fio_ime_psyncv_end(td, ret);
}
static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
unsigned int max, const struct timespec *t)
{
struct ime_data *ime_d = td->io_ops_data;
struct io_u *io_u;
int events = 0;
unsigned int count;
if (ime_d->events) {
for (count = 0; count < ime_d->events; count++) {
io_u = ime_d->io_us[count];
ime_d->event_io_us[events] = io_u;
events++;
}
fio_ime_queue_reset(ime_d);
}
dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
min, max, events, ime_d->queued, ime_d->events);
return events;
}
static int fio_ime_psyncv_init(struct thread_data *td)
{
struct ime_data *ime_d;
if (fio_ime_engine_init(td) < 0)
return 1;
ime_d = calloc(1, sizeof(*ime_d));
ime_d->sioreq = malloc(sizeof(struct imesio_req));
ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
ime_d->depth = td->o.iodepth;
td->io_ops_data = ime_d;
return 0;
}
static void fio_ime_psyncv_clean(struct thread_data *td)
{
struct ime_data *ime_d = td->io_ops_data;
if (ime_d) {
free(ime_d->sioreq);
free(ime_d->iovecs);
free(ime_d->io_us);
free(ime_d);
td->io_ops_data = NULL;
}
fio_ime_engine_finalize(td);
}
/**************************************************************
* Private functions for non-blocking IOs
*
**************************************************************/
void fio_ime_aio_complete_cb (struct ime_aiocb *aiocb, int err,
ssize_t bytes)
{
struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
pthread_mutex_lock(&ioreq->status_mutex);
ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
pthread_mutex_unlock(&ioreq->status_mutex);
pthread_cond_signal(&ioreq->cond_endio);
}
static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
{
/* So far we can queue in any case. */
return true;
}
static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
{
/* We can only append if:
- The iovecs will be contiguous in the array
- There is already a queued iovec
- The offsets are contiguous
- The ddir and fs are the same */
return (ime_d->head != 0 &&
ime_d->queued - ime_d->events > 0 &&
ime_d->last_offset == io_u->offset &&
ime_d->last_req->ddir == io_u->ddir &&
ime_d->last_req->iocb.fd == io_u->file->fd);
}
/* Before using this function, we should have already
ensured that the queue is not full */
static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
{
struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
struct ime_aiocb *iocb = &ioreq->iocb;
struct iovec *iov = &ime_d->iovecs[ime_d->head];
iov->iov_base = io_u->xfer_buf;
iov->iov_len = io_u->xfer_buflen;
if (fio_ime_aio_can_append(ime_d, io_u))
ime_d->last_req->iocb.iovcnt++;
else {
ioreq->status = FIO_IME_IN_PROGRESS;
ioreq->ddir = io_u->ddir;
ime_d->last_req = ioreq;
iocb->complete_cb = &fio_ime_aio_complete_cb;
iocb->fd = io_u->file->fd;
iocb->file_offset = io_u->offset;
iocb->iov = iov;
iocb->iovcnt = 1;
iocb->flags = 0;
iocb->user_context = (intptr_t) ioreq;
}
ime_d->io_us[ime_d->head] = io_u;
ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
fio_ime_queue_incr(ime_d);
}
/* Tries to queue an IO. It will create a new request if the IO can't be
appended to the current request. It will fail if the queue can't contain
any more io_u/iovec. In this case, commit and then get_events need to be
called. */
static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
struct io_u *io_u)
{
struct ime_data *ime_d = td->io_ops_data;
fio_ro_check(td, io_u);
dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
io_u->ddir, ime_d->head, ime_d->cur_commit,
ime_d->queued, ime_d->events);
if (ime_d->queued == ime_d->depth)
return FIO_Q_BUSY;
if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
if (!fio_ime_aio_can_queue(ime_d, io_u))
return FIO_Q_BUSY;
fio_ime_aio_enqueue(ime_d, io_u);
return FIO_Q_QUEUED;
} else if (io_u->ddir == DDIR_SYNC) {
if (ime_native_fsync(io_u->file->fd) < 0) {
io_u->error = errno;
td_verror(td, io_u->error, "fsync");
}
return FIO_Q_COMPLETED;
} else {
io_u->error = EINVAL;
td_verror(td, io_u->error, "wrong ddir");
return FIO_Q_COMPLETED;
}
}
static int fio_ime_aio_commit(struct thread_data *td)
{
struct ime_data *ime_d = td->io_ops_data;
struct imeaio_req *ioreq;
int ret = 0;
/* Loop while there are events to commit */
while (ime_d->queued - ime_d->events) {
ioreq = &ime_d->aioreqs[ime_d->cur_commit];
if (ioreq->ddir == DDIR_READ)
ret = ime_native_aio_read(&ioreq->iocb);
else
ret = ime_native_aio_write(&ioreq->iocb);
fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
/* fio needs a negative error code */
if (ret < 0) {
ioreq->status = FIO_IME_REQ_ERROR;
return -errno;
}
io_u_mark_submit(td, ioreq->iocb.iovcnt);
dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
ioreq->iocb.iovcnt, ime_d->cur_commit,
ime_d->queued, ime_d->events);
}
return 0;
}
static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
unsigned int max, const struct timespec *t)
{
struct ime_data *ime_d = td->io_ops_data;
struct imeaio_req *ioreq;
struct io_u *io_u;
int events = 0;
unsigned int count;
ssize_t bytes;
while (ime_d->events) {
ioreq = &ime_d->aioreqs[ime_d->tail];
/* Break if we already got events, and if we will
exceed max if we append the next events */
if (events && events + ioreq->iocb.iovcnt > max)
break;
if (ioreq->status != FIO_IME_IN_PROGRESS) {
bytes = ioreq->status;
for (count = 0; count < ioreq->iocb.iovcnt; count++) {
io_u = ime_d->io_us[ime_d->tail];
ime_d->event_io_us[events] = io_u;
events++;
fio_ime_queue_red(ime_d);
if (ioreq->status == FIO_IME_REQ_ERROR)
io_u->error = EIO;
else {
io_u->resid = bytes > io_u->xfer_buflen ?
0 : io_u->xfer_buflen - bytes;
io_u->error = 0;
bytes -= io_u->xfer_buflen - io_u->resid;
}
}
} else {
pthread_mutex_lock(&ioreq->status_mutex);
while (ioreq->status == FIO_IME_IN_PROGRESS)
pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
pthread_mutex_unlock(&ioreq->status_mutex);
}
}
dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
events, ime_d->queued, ime_d->events);
return events;
}
static int fio_ime_aio_init(struct thread_data *td)
{
struct ime_data *ime_d;
struct imeaio_req *ioreq;
unsigned int i;
if (fio_ime_engine_init(td) < 0)
return 1;
ime_d = calloc(1, sizeof(*ime_d));
ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
ime_d->depth = td->o.iodepth;
for (i = 0; i < ime_d->depth; i++) {
ioreq = &ime_d->aioreqs[i];
pthread_cond_init(&ioreq->cond_endio, NULL);
pthread_mutex_init(&ioreq->status_mutex, NULL);
}
td->io_ops_data = ime_d;
return 0;
}
static void fio_ime_aio_clean(struct thread_data *td)
{
struct ime_data *ime_d = td->io_ops_data;
struct imeaio_req *ioreq;
unsigned int i;
if (ime_d) {
for (i = 0; i < ime_d->depth; i++) {
ioreq = &ime_d->aioreqs[i];
pthread_cond_destroy(&ioreq->cond_endio);
pthread_mutex_destroy(&ioreq->status_mutex);
}
free(ime_d->aioreqs);
free(ime_d->iovecs);
free(ime_d->io_us);
free(ime_d);
td->io_ops_data = NULL;
}
fio_ime_engine_finalize(td);
}
/**************************************************************
* IO engines definitions
*
**************************************************************/
/* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
FIO from using POSIX calls. See fio_ime_open_file for more details. */
static struct ioengine_ops ioengine_prw = {
.name = "ime_psync",
.version = FIO_IOOPS_VERSION,
.setup = fio_ime_setup,
.init = fio_ime_engine_init,
.cleanup = fio_ime_engine_finalize,
.queue = fio_ime_psync_queue,
.open_file = fio_ime_open_file,
.close_file = fio_ime_close_file,
.get_file_size = fio_ime_get_file_size,
.unlink_file = fio_ime_unlink_file,
.flags = FIO_SYNCIO | FIO_DISKLESSIO,
};
static struct ioengine_ops ioengine_pvrw = {
.name = "ime_psyncv",
.version = FIO_IOOPS_VERSION,
.setup = fio_ime_setup,
.init = fio_ime_psyncv_init,
.cleanup = fio_ime_psyncv_clean,
.queue = fio_ime_psyncv_queue,
.commit = fio_ime_psyncv_commit,
.getevents = fio_ime_psyncv_getevents,
.event = fio_ime_event,
.open_file = fio_ime_open_file,
.close_file = fio_ime_close_file,
.get_file_size = fio_ime_get_file_size,
.unlink_file = fio_ime_unlink_file,
.flags = FIO_SYNCIO | FIO_DISKLESSIO,
};
static struct ioengine_ops ioengine_aio = {
.name = "ime_aio",
.version = FIO_IOOPS_VERSION,
.setup = fio_ime_setup,
.init = fio_ime_aio_init,
.cleanup = fio_ime_aio_clean,
.queue = fio_ime_aio_queue,
.commit = fio_ime_aio_commit,
.getevents = fio_ime_aio_getevents,
.event = fio_ime_event,
.open_file = fio_ime_open_file,
.close_file = fio_ime_close_file,
.get_file_size = fio_ime_get_file_size,
.unlink_file = fio_ime_unlink_file,
.flags = FIO_DISKLESSIO,
};
static void fio_init fio_ime_register(void)
{
register_ioengine(&ioengine_prw);
register_ioengine(&ioengine_pvrw);
register_ioengine(&ioengine_aio);
}
static void fio_exit fio_ime_unregister(void)
{
unregister_ioengine(&ioengine_prw);
unregister_ioengine(&ioengine_pvrw);
unregister_ioengine(&ioengine_aio);
if (fio_ime_is_initialized && ime_native_finalize() < 0)
log_err("Warning: IME did not finalize properly\n");
}