===================================================================
@@ -699,7 +699,8 @@ gfc_finish_var_decl (tree decl, gfc_symbol * sym)
&& CLASS_DATA (sym)->ts.u.derived->attr.has_dtio_procs)))
TREE_STATIC (decl) = 1;
- if (sym->attr.volatile_)
+ /* Treat asynchronous variables the same as volatile, for now. */
+ if (sym->attr.volatile_ || sym->attr.asynchronous)
{
TREE_THIS_VOLATILE (decl) = 1;
TREE_SIDE_EFFECTS (decl) = 1;
===================================================================
@@ -438,10 +438,9 @@ gfc_build_io_library_fndecls (void)
get_identifier (PREFIX("st_iolength")), ".w",
void_type_node, 1, dt_parm_type);
- /* TODO: Change when asynchronous I/O is implemented. */
parm_type = build_pointer_type (st_parameter[IOPARM_ptype_wait].type);
iocall[IOCALL_WAIT] = gfc_build_library_function_decl_with_spec (
- get_identifier (PREFIX("st_wait")), ".X",
+ get_identifier (PREFIX("st_wait_async")), ".w",
void_type_node, 1, parm_type);
parm_type = build_pointer_type (st_parameter[IOPARM_ptype_filepos].type);
@@ -1527,7 +1526,7 @@ gfc_trans_wait (gfc_code * code)
mask |= IOPARM_common_err;
if (p->id)
- mask |= set_parameter_value (&block, var, IOPARM_wait_id, p->id);
+ mask |= set_parameter_ref (&block, &post_block, var, IOPARM_wait_id, p->id);
set_parameter_const (&block, var, IOPARM_common_flags, mask);
===================================================================
@@ -100,7 +100,8 @@ io/transfer128.c \
io/unit.c \
io/unix.c \
io/write.c \
-io/fbuf.c
+io/fbuf.c \
+io/async.c
endif
@@ -108,7 +109,8 @@ gfor_io_headers= \
io/io.h \
io/fbuf.h \
io/format.h \
-io/unix.h
+io/unix.h \
+io/async.h
gfor_helper_src= \
intrinsics/associated.c \
===================================================================
@@ -70,7 +70,8 @@ target_triplet = @target@
@LIBGFOR_MINIMAL_FALSE@io/unit.c \
@LIBGFOR_MINIMAL_FALSE@io/unix.c \
@LIBGFOR_MINIMAL_FALSE@io/write.c \
-@LIBGFOR_MINIMAL_FALSE@io/fbuf.c
+@LIBGFOR_MINIMAL_FALSE@io/fbuf.c \
+@LIBGFOR_MINIMAL_FALSE@io/async.c
@LIBGFOR_MINIMAL_FALSE@am__append_3 = \
@LIBGFOR_MINIMAL_FALSE@intrinsics/access.c \
@@ -352,7 +353,7 @@ am__objects_47 = $(am__objects_4) $(am__objects_5)
@LIBGFOR_MINIMAL_FALSE@ inquire.lo intrinsics.lo list_read.lo \
@LIBGFOR_MINIMAL_FALSE@ lock.lo open.lo read.lo transfer.lo \
@LIBGFOR_MINIMAL_FALSE@ transfer128.lo unit.lo unix.lo write.lo \
-@LIBGFOR_MINIMAL_FALSE@ fbuf.lo
+@LIBGFOR_MINIMAL_FALSE@ fbuf.lo async.lo
am__objects_49 = size_from_kind.lo $(am__objects_48)
@LIBGFOR_MINIMAL_FALSE@am__objects_50 = access.lo c99_functions.lo \
@LIBGFOR_MINIMAL_FALSE@ chdir.lo chmod.lo clock.lo cpu_time.lo \
@@ -650,7 +651,8 @@ gfor_io_headers = \
io/io.h \
io/fbuf.h \
io/format.h \
-io/unix.h
+io/unix.h \
+io/async.h
gfor_helper_src = intrinsics/associated.c intrinsics/abort.c \
intrinsics/args.c intrinsics/cshift0.c intrinsics/eoshift0.c \
@@ -1550,6 +1552,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/any_l8.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/args.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/associated.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/async.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/backtrace.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/bessel_r10.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/bessel_r16.Plo@am__quote@
@@ -5813,6 +5816,13 @@ fbuf.lo: io/fbuf.c
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o fbuf.lo `test -f 'io/fbuf.c' || echo '$(srcdir)/'`io/fbuf.c
+async.lo: io/async.c
+@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT async.lo -MD -MP -MF $(DEPDIR)/async.Tpo -c -o async.lo `test -f 'io/async.c' || echo '$(srcdir)/'`io/async.c
+@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/async.Tpo $(DEPDIR)/async.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='io/async.c' object='async.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o async.lo `test -f 'io/async.c' || echo '$(srcdir)/'`io/async.c
+
associated.lo: intrinsics/associated.c
@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT associated.lo -MD -MP -MF $(DEPDIR)/associated.Tpo -c -o associated.lo `test -f 'intrinsics/associated.c' || echo '$(srcdir)/'`intrinsics/associated.c
@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/associated.Tpo $(DEPDIR)/associated.Plo
===================================================================
@@ -1482,3 +1482,8 @@ GFORTRAN_C99_8 {
y1f;
ynf;
};
+
+GFORTRAN_9 {
+ global:
+ _gfortran_st_wait_async;
+};
===================================================================
@@ -0,0 +1,401 @@
+/* Copyright (C) 2018 Free Software Foundation, Inc.
+ Contributed by Nicolas Koenig
+
+This file is part of the GNU Fortran runtime library (libgfortran).
+
+Libgfortran is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 3, or (at your option)
+any later version.
+
+Libgfortran is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+Under Section 7 of GPL version 3, you are granted additional
+permissions described in the GCC Runtime Library Exception, version
+3.1, as published by the Free Software Foundation.
+
+You should have received a copy of the GNU General Public License and
+a copy of the GCC Runtime Library Exception along with this program;
+see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
+<http://www.gnu.org/licenses/>. */
+
+#include "libgfortran.h"
+
+#ifndef GTHREAD_USE_WEAK
+#ifdef SUPPORTS_WEAK
+#define GTHREAD_USE_WEAK 1
+#endif
+#endif
+
+#define _GTHREAD_USE_COND_INIT_FUNC
+#include "../../libgcc/gthr-posix.h"
+#include "io.h"
+#include "fbuf.h"
+#include "format.h"
+#include "unix.h"
+#include <string.h>
+#include <assert.h>
+
+#include <sys/types.h>
+
+#include "async.h"
+
+DEBUG_LINE(__thread const char *aio_prefix = MPREFIX);
+
+DEBUG_LINE(__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
+DEBUG_LINE(aio_lock_debug *aio_debug_head = NULL;)
+
+__thread gfc_unit *thread_unit = NULL;
+
+typedef struct transfer_queue
+{
+ enum aio_do type;
+ struct transfer_queue *next;
+ struct st_parameter_dt *new_pdt;
+ transfer_args arg;
+ _Bool has_id;
+} transfer_queue;
+
+struct error {
+ st_parameter_dt *dtp;
+ int id;
+};
+
+static void
+update_pdt(st_parameter_dt **old, st_parameter_dt *new) {
+ st_parameter_dt *temp;
+ NOTE("Changing pdts");
+ temp = *old;
+ *old = new;
+ if(temp)
+ free(temp);
+}
+
+static void
+destroy_adv_cond (struct adv_cond * ac)
+{
+ T_ERROR (__gthread_mutex_destroy, &ac->lock);
+ T_ERROR (__gthread_cond_destroy, &ac->signal);
+}
+
+static void *
+async_io (void *arg)
+{
+ DEBUG_LINE(aio_prefix = TPREFIX);
+ transfer_queue *ctq = NULL, *prev = NULL;
+ gfc_unit *u = (gfc_unit *) arg;
+ async_unit *au = u->au;
+ thread_unit = u;
+ LOCK (&au->lock);
+ au->thread = __gthread_self ();
+ UNLOCK (&au->lock);
+ while (true)
+ {
+ WAIT_SIGNAL (&au->work, au->tail || au->finished);
+ LOCK (&au->lock);
+ ctq = au->head;
+ prev = NULL;
+ while (ctq)
+ {
+ if (prev)
+ free (prev);
+ prev = ctq;
+ if (!au->error.has_error)
+ {
+ UNLOCK (&au->lock);
+
+ switch(ctq->type)
+ {
+ case AIO_WRITE_DONE:
+ NOTE("Finalizing write");
+ st_write_done_worker (au->pdt);
+ break;
+ case AIO_READ_DONE:
+ NOTE("Finalizing read");
+ st_read_done_worker (au->pdt);
+ break;
+ case AIO_CHANGE_PDT:
+ update_pdt(&au->pdt, ctq->new_pdt);
+ break;
+ case AIO_TRANSFER_SCALAR:
+ NOTE("Starting scalar transfer");
+ ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
+ ctq->arg.scalar.data,
+ ctq->arg.scalar.i,
+ ctq->arg.scalar.s1,
+ ctq->arg.scalar.s2);
+ break;
+ case AIO_TRANSFER_ARRAY:
+ NOTE("Starting array transfer");
+ NOTE("ctq->arg.array.desc = %p", (void *) (ctq->arg.array.desc));
+ transfer_array_inner(au->pdt, ctq->arg.array.desc,
+ ctq->arg.array.kind,
+ ctq->arg.array.charlen);
+ NOTE("desc = %p", (void *) ctq->arg.array.desc);
+ free (ctq->arg.array.desc);
+ break;
+ default:
+ internal_error(NULL, "Invalid queue type");
+ break;
+ }
+ LOCK (&au->lock);
+ if (unlikely (au->error.has_error))
+ {
+ au->error.last_good_id = au->id.low - 1;
+ unlock_unit (au->pdt->u.p.current_unit);
+ }
+ }
+ NOTE("Current id: %d", au->id.low);
+ if (ctq->has_id && au->id.waiting == au->id.low++)
+ SIGNAL (&au->id.done);
+ ctq = ctq->next;
+ }
+ au->tail = NULL;
+ au->head = NULL;
+ SIGNAL (&au->emptysignal);
+ au->empty = 1;
+ if (au->finished)
+ break;
+ UNLOCK (&au->lock);
+ }
+ UNLOCK (&au->lock);
+ return NULL;
+}
+
+
+static void
+free_async_unit (async_unit * au)
+{
+ if (au->tail)
+ internal_error(NULL, "Trying to free nonempty unit");
+ destroy_adv_cond (&au->work);
+ destroy_adv_cond (&au->emptysignal);
+ destroy_adv_cond (&au->id.done);
+ T_ERROR (__gthread_mutex_destroy, &au->lock);
+ free (au);
+}
+
+static void
+init_adv_cond (struct adv_cond * ac)
+{
+ ac->pending = 0;
+ __GTHREAD_MUTEX_INIT_FUNCTION(&ac->lock);
+ __gthread_cond_init_function (&ac->signal);
+}
+
+async_unit *
+init_async_unit (gfc_unit *u)
+{
+ async_unit *au;
+ if (!__gthread_active_p ())
+ return NULL;
+
+ au = (async_unit *) xmalloc (sizeof (async_unit));
+ u->au = au;
+ init_adv_cond (&au->work);
+ init_adv_cond (&au->emptysignal);
+ __GTHREAD_MUTEX_INIT_FUNCTION(&au->lock);
+ T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
+ au->pdt = NULL;
+ au->head = NULL;
+ au->tail = NULL;
+ au->empty = true;
+ au->finished = 0;
+ au->id.waiting = -1;
+ au->id.low = 0;
+ au->id.high = 0;
+ au->error.fatal_error = 0;
+ au->error.has_error = 0;
+ au->error.last_good_id = 0;
+ init_adv_cond (&au->id.done);
+ return au;
+}
+
+
+void
+enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do type)
+{
+ transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
+ tq->arg = *arg;
+ tq->type = type;
+ tq->has_id = 0;
+ LOCK (&au->lock);
+ if (!au->tail)
+ au->head = tq;
+ else
+ au->tail->next = tq;
+ au->tail = tq;
+ REVOKE_SIGNAL (&(au->emptysignal));
+ au->empty = false;
+ UNLOCK (&au->lock);
+ SIGNAL(&au->work);
+}
+
+int
+enqueue_done_id (async_unit *au, enum aio_do type) {
+ int ret;
+ transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
+
+ tq->type = type;
+ tq->has_id = 1;
+ LOCK (&au->lock);
+ if (!au->tail)
+ au->head = tq;
+ else
+ au->tail->next = tq;
+ au->tail = tq;
+ REVOKE_SIGNAL (&(au->emptysignal));
+ au->empty = false;
+ ret = au->id.high++;
+ NOTE("Enqueue id: %d", ret)
+ UNLOCK(&au->lock);
+ SIGNAL(&au->work);
+ return ret;
+}
+
+void
+enqueue_done (async_unit *au, enum aio_do type)
+{
+ transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
+ tq->type = type;
+ tq->has_id = 0;
+ LOCK (&au->lock);
+ if (!au->tail)
+ au->head = tq;
+ else
+ au->tail->next = tq;
+ au->tail = tq;
+ REVOKE_SIGNAL (&(au->emptysignal));
+ au->empty = false;
+ UNLOCK (&au->lock);
+ SIGNAL(&au->work);
+}
+
+void
+change_pdt (async_unit *au, st_parameter_dt *dt) {
+ st_parameter_dt *new = xmalloc(sizeof(st_parameter_dt));
+ transfer_queue *tq = xmalloc(sizeof(transfer_queue));
+ memcpy((void *) new, (void *) dt, sizeof(st_parameter_dt));
+ tq->next = NULL;
+ tq->new_pdt = new;
+ tq->type = AIO_CHANGE_PDT;
+ tq->has_id = 0;
+ LOCK(&au->lock);
+ if (!au->tail)
+ au->head = tq;
+ else
+ au->tail->next = tq;
+ au->tail = tq;
+ REVOKE_SIGNAL (&(au->emptysignal));
+ au->empty = 0;
+ UNLOCK(&au->lock);
+ SIGNAL(&au->work);
+}
+
+/* Perform a wait operation on an asynchronous unit, which means collecting
+ the errors that may have happened asynchronously. Return true if an error
+ has been encountered. */
+
+static bool
+async_wait_operation (st_parameter_common *cmp, async_unit *au)
+{
+ bool has_error = au->error.has_error;
+
+ if (has_error)
+ {
+ if (generate_error_common (cmp, au->error.family, au->error.message))
+ {
+ au->error.has_error = 0;
+ au->error.cmp = NULL;
+ }
+ else
+ {
+ /* The program will exit later. */
+ au->error.fatal_error = true;
+ }
+ }
+ return has_error;
+}
+
+/* Perform a wait operation on an asynchronous unit with an ID specified,
+ which means collecting the errors that may have happened asynchronously.
+ Return true if an error has been encountered. */
+
+bool
+async_wait_id (st_parameter_common *cmp, async_unit * au, int i)
+{
+ bool ret;
+
+ if (au == NULL)
+ return false;
+
+ if (cmp == NULL)
+ cmp = au->error.cmp;
+
+ if (au->error.has_error)
+ {
+ if (i <= au->error.last_good_id)
+ return false;
+
+ return async_wait_operation (cmp, au);
+ }
+
+ LOCK (&au->lock);
+ NOTE("Waiting for id %d", i);
+ if (au->id.waiting < i)
+ au->id.waiting = i;
+ UNLOCK (&au->lock);
+ SIGNAL (&(au->work));
+ WAIT_SIGNAL (&(au->id.done), (au->id.low >= au->id.waiting || au->empty));
+ LOCK (&au->lock);
+ ret = async_wait_operation (cmp, au);
+ UNLOCK(&au->lock);
+ return ret;
+}
+
+bool
+async_wait (st_parameter_common *cmp, async_unit * au)
+{
+ bool ret;
+
+ if (au == NULL)
+ return false;
+
+ if (cmp == NULL)
+ cmp = au->error.cmp;
+
+ SIGNAL (&(au->work));
+ LOCK(&au->lock);
+ if (au->empty)
+ {
+ ret = async_wait_operation (cmp, au);
+ UNLOCK(&au->lock);
+ return ret;
+ }
+
+ UNLOCK(&au->lock);
+ WAIT_SIGNAL (&(au->emptysignal), (au->empty));
+ LOCK (&au->lock);
+ ret = async_wait_operation (cmp, au);
+ UNLOCK(&au->lock);
+ return ret;
+}
+
+void
+async_close (async_unit * au)
+{
+ NOTE("Closing async unit");
+ if (!au->error.fatal_error)
+ {
+ LOCK (&au->lock);
+ au->finished = 1;
+ UNLOCK (&au->lock);
+ NOTE ("About to wait in async_close");
+ async_wait (NULL, au);
+ T_ERROR (__gthread_join, au->thread, NULL);
+ }
+ free_async_unit(au);
+}
===================================================================
@@ -0,0 +1,332 @@
+/* Copyright (C) 2018 Free Software Foundation, Inc.
+ Contributed by Nicolas Koenig
+
+This file is part of the GNU Fortran runtime library (libgfortran).
+
+Libgfortran is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 3, or (at your option)
+any later version.
+
+Libgfortran is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+Under Section 7 of GPL version 3, you are granted additional
+permissions described in the GCC Runtime Library Exception, version
+3.1, as published by the Free Software Foundation.
+
+You should have received a copy of the GNU General Public License and
+a copy of the GCC Runtime Library Exception along with this program;
+see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
+<http://www.gnu.org/licenses/>. */
+
+#ifndef ASYNC_H
+#define ASYNC_H
+
+#define DEBUG_ASYNC
+#undef DEBUG_ASYNC
+
+#ifdef DEBUG_ASYNC
+
+#define DEBUG_PRINTF(...) fprintf(stderr,__VA_ARGS__)
+
+#define IN_DEBUG_QUEUE(mutex) ({\
+ __label__ end;\
+ aio_lock_debug *curr = aio_debug_head;\
+ while(curr) { \
+ if (curr->m == mutex) {\
+ goto end;\
+ }\
+ curr = curr->next;\
+ }\
+ end:;\
+ curr;\
+ })
+
+#define TAIL_DEBUG_QUEUE ({\
+ aio_lock_debug *curr = aio_debug_head; \
+ while(curr && curr->next) { \
+ curr = curr->next;\
+ }\
+ curr;\
+ })
+
+#define CHECK_LOCK(mutex, status) do {\
+ aio_lock_debug *curr;\
+ INTERN_LOCK(&debug_queue_lock); \
+ if (__gthread_mutex_trylock(mutex)) {\
+ if ((curr = IN_DEBUG_QUEUE(mutex))) {\
+ sprintf(status, "\033[31m%s():%d\033[0m", curr->func, curr->line);\
+ } else\
+ sprintf(status, "\033[31munknown\033[0m");\
+ }\
+ else {\
+ __gthread_mutex_unlock(mutex);\
+ sprintf(status, "\033[32munlocked\033[0m");\
+ }\
+ INTERN_UNLOCK(&debug_queue_lock);\
+ }while(0)
+
+#define T_ERROR(func, ...) do {\
+ int t_error_temp; \
+ t_error_temp = func(__VA_ARGS__);\
+ if(t_error_temp)\
+ ERROR(t_error_temp, "args: " #__VA_ARGS__ "\n");\
+ } while(0)
+
+#define NOTE(str, ...) do{\
+ char note_str[200];\
+ sprintf(note_str, "%s\033[35mNOTE: \033[0m" str, aio_prefix, ##__VA_ARGS__);\
+ DEBUG_PRINTF("%-90s %20s():5%d\n", note_str, __FUNCTION__, __LINE__);\
+ }while(0);
+
+#define ERROR(errnum, str, ...) do{\
+ char note_str[200];\
+ sprintf(note_str, "%s\033[41;37mERROR:\033[0m [%d] " str, aio_prefix, \
+ errnum, ##__VA_ARGS__);\
+ DEBUG_PRINTF("%-68s %s():%d\n", note_str, __FUNCTION__, __LINE__);\
+ }while(0)
+
+#define MUTEX_DEBUG_ADD(mutex) do {\
+ aio_lock_debug *n;\
+ n = malloc(sizeof(aio_lock_debug));\
+ n->prev = TAIL_DEBUG_QUEUE;\
+ if (n->prev) \
+ n->prev->next = n; \
+ n->next = NULL;\
+ n->line = __LINE__;\
+ n->func = __FUNCTION__;\
+ n->m = mutex;\
+ if (!aio_debug_head) {\
+ aio_debug_head = n;\
+ }\
+ } while(0)
+
+#define UNLOCK(mutex) do {\
+ aio_lock_debug *curr;\
+ DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[32mUNLOCK: \033[0m" #mutex, \
+ __FUNCTION__, __LINE__, (void *) mutex); \
+ INTERN_LOCK(&debug_queue_lock);\
+ curr = IN_DEBUG_QUEUE(mutex);\
+ if (curr) \
+ {\
+ if (curr->prev)\
+ curr->prev->next = curr->next;\
+ if (curr->next) {\
+ curr->next->prev = curr->prev;\
+ if (curr == aio_debug_head)\
+ aio_debug_head = curr->next;\
+ } else {\
+ if (curr == aio_debug_head)\
+ aio_debug_head = NULL;\
+ }\
+ free(curr);\
+ }\
+ INTERN_UNLOCK(&debug_queue_lock);\
+ INTERN_UNLOCK(mutex);\
+ }while(0)
+
+#define TRYLOCK(mutex) ({\
+ char status[200];\
+ int res;\
+ aio_lock_debug *curr;\
+ res = __gthread_mutex_trylock(mutex);\
+ INTERN_LOCK(&debug_queue_lock); \
+ if (res) {\
+ if ((curr = IN_DEBUG_QUEUE(mutex))) {\
+ sprintf(status, "\033[31m%s():%d\033[0m", curr->func, curr->line);\
+ } else\
+ sprintf(status, "\033[31munknown\033[0m");\
+ }\
+ else {\
+ sprintf(status, "\033[32munlocked\033[0m");\
+ MUTEX_DEBUG_ADD(mutex);\
+ }\
+ DEBUG_PRINTF("%s%-44s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \
+ "\033[31;2mTRYLOCK: \033[0m" #mutex, status, __FUNCTION__, __LINE__,\
+ (void *) mutex);\
+ INTERN_UNLOCK(&debug_queue_lock);\
+ res;\
+ })
+
+#define LOCK(mutex) do {\
+ char status[200];\
+ CHECK_LOCK(mutex, status);\
+ DEBUG_PRINTF("%s%-42s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \
+ "\033[31mLOCK: \033[0m" #mutex, status, __FUNCTION__, __LINE__, (void *) mutex); \
+ INTERN_LOCK(mutex);\
+ INTERN_LOCK(&debug_queue_lock); \
+ MUTEX_DEBUG_ADD(mutex);\
+ INTERN_UNLOCK(&debug_queue_lock);\
+ DEBUG_PRINTF("%s\033[31mACQ:\033[0m %-30s %78p\n", aio_prefix, #mutex, mutex); \
+ }while(0)
+
+#define DEBUG_LINE(...) __VA_ARGS__
+
+#else
+#define DEBUG_PRINTF(...) {}
+#define CHECK_LOCK(au, mutex, status) {}
+#define NOTE(str, ...) {}
+#define DEBUG_LINE(...)
+#define T_ERROR(func, ...) func(__VA_ARGS__)
+#define LOCK(mutex) INTERN_LOCK(mutex)
+#define UNLOCK(mutex) INTERN_UNLOCK(mutex)
+#define TRYLOCK(mutex) (__gthread_mutex_trylock(mutex))
+#endif
+
+#define MPREFIX "\033[30;46mM:\033[0m "
+#define TPREFIX "\033[37;44mT:\033[0m "
+#define RPREFIX "\033[37;41mR:\033[0m "
+
+#define INTERN_LOCK(mutex) T_ERROR(__gthread_mutex_lock, mutex);
+
+#define INTERN_UNLOCK(mutex) T_ERROR(__gthread_mutex_unlock, mutex);
+
+#define SIGNAL(advcond) do{\
+ INTERN_LOCK(&(advcond)->lock);\
+ (advcond)->pending = 1;\
+ INTERN_UNLOCK(&(advcond)->lock);\
+ DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[33mSIGNAL: \033[0m" \
+ #advcond, __FUNCTION__, __LINE__, (void *) advcond);\
+ T_ERROR(__gthread_cond_broadcast, &(advcond)->signal);\
+ }while(0)
+
+#define WAIT_SIGNAL(advcond, condition) do{\
+ __label__ finish;\
+ INTERN_LOCK(&((advcond)->lock));\
+ DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[34mWAITING: \033[0m" \
+ #advcond, __FUNCTION__, __LINE__, (void *) advcond);\
+ if ((advcond)->pending){\
+ goto finish;\
+ }\
+ if ( condition ) {\
+ goto finish; \
+ }\
+ while(!__gthread_cond_wait(&(advcond)->signal, &(advcond)->lock)) {\
+ if ( condition ){\
+ DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[33mREC: \033[0m" \
+ #advcond, __FUNCTION__, __LINE__, (void *)advcond);\
+ break;\
+ }\
+ }\
+ finish: \
+ (advcond)->pending = 0;\
+ INTERN_UNLOCK(&((advcond)->lock));\
+ }while(0)
+
+#define REVOKE_SIGNAL(advcond) do{\
+ INTERN_LOCK(&(advcond)->lock);\
+ (advcond)->pending = 0;\
+ INTERN_UNLOCK(&(advcond)->lock);\
+ }while(0)
+
+DEBUG_LINE(extern __thread const char *aio_prefix);
+
+DEBUG_LINE(typedef struct aio_lock_debug{
+ __gthread_mutex_t *m;
+ int line;
+ const char *func;
+ struct aio_lock_debug *next;
+ struct aio_lock_debug *prev;
+} aio_lock_debug;)
+
+DEBUG_LINE(extern aio_lock_debug *aio_debug_head;)
+DEBUG_LINE(extern __gthread_mutex_t debug_queue_lock;)
+
+extern __thread gfc_unit *thread_unit;
+
+enum aio_do {
+ AIO_INVALID = 0,
+ AIO_CHANGE_PDT,
+ AIO_TRANSFER_SCALAR,
+ AIO_TRANSFER_ARRAY,
+ AIO_WRITE_DONE,
+ AIO_READ_DONE
+};
+
+typedef union transfer_args
+{
+ struct
+ {
+ void (*transfer) (struct st_parameter_dt *, bt, void *, int, size_t, size_t);
+ bt arg_bt;
+ void *data;
+ int i;
+ size_t s1;
+ size_t s2;
+ } scalar;
+ struct
+ {
+ gfc_array_char *desc;
+ int kind;
+ gfc_charlen_type charlen;
+ } array;
+} transfer_args;
+
+struct adv_cond
+{
+ int pending;
+ __gthread_mutex_t lock;
+ __gthread_cond_t signal;
+};
+
+typedef struct async_unit
+{
+ pthread_mutex_t lock;
+ struct adv_cond work;
+ struct adv_cond emptysignal;
+ struct st_parameter_dt *pdt;
+ pthread_t thread;
+ struct transfer_queue *head;
+ struct transfer_queue *tail;
+ struct
+ {
+ int waiting;
+ int low;
+ int high;
+ struct adv_cond done;
+ } id;
+
+ int finished;
+ bool empty;
+
+ struct {
+ const char *message;
+ st_parameter_common *cmp;
+ bool has_error;
+ int last_good_id;
+ int family;
+ bool fatal_error;
+ } error;
+
+} async_unit;
+
+async_unit *init_async_unit(gfc_unit *);
+internal_proto (init_async_unit);
+
+bool async_wait (st_parameter_common *, async_unit *);
+internal_proto(async_wait);
+
+bool async_wait_id (st_parameter_common *, async_unit *, int);
+internal_proto(async_wait_id);
+
+void async_close (async_unit *);
+internal_proto(async_close);
+
+void enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do);
+internal_proto(enqueue_transfer);
+
+void enqueue_done (async_unit *, enum aio_do type);
+internal_proto(enqueue_done);
+
+int enqueue_done_id (async_unit *, enum aio_do type);
+internal_proto(enqueue_done_id);
+
+void enqueue_init (async_unit *);
+internal_proto(enqueue_init);
+
+void change_pdt(async_unit *, st_parameter_dt *);
+internal_proto(change_pdt);
+#endif
===================================================================
@@ -24,6 +24,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include "io.h"
#include "unix.h"
+#include "async.h"
#include <limits.h>
typedef enum
@@ -57,6 +58,15 @@ st_close (st_parameter_close *clp)
find_option (&clp->common, clp->status, clp->status_len,
status_opt, "Bad STATUS parameter in CLOSE statement");
+ u = find_unit (clp->common.unit);
+
+ if (u && u->au)
+ if (async_wait (&(clp->common), u->au))
+ {
+ library_end();
+ return;
+ }
+
if ((clp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
{
library_end ();
@@ -63,7 +73,6 @@ st_close (st_parameter_close *clp)
return;
}
- u = find_unit (clp->common.unit);
if (u != NULL)
{
if (close_share (u) < 0)
===================================================================
@@ -25,6 +25,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include "io.h"
#include "fbuf.h"
#include "unix.h"
+#include "async.h"
#include <string.h>
/* file_pos.c-- Implement the file positioning statements, i.e. BACKSPACE,
@@ -214,6 +215,9 @@ st_backspace (st_parameter_filepos *fpp)
goto done;
}
+ if (u->au && async_wait (&(fpp->common), u->au))
+ return;
+
/* Make sure format buffer is flushed and reset. */
if (u->flags.form == FORM_FORMATTED)
{
@@ -294,6 +298,9 @@ st_endfile (st_parameter_filepos *fpp)
goto done;
}
+ if (u->au && async_wait (&(fpp->common), u->au))
+ return;
+
if (u->flags.access == ACCESS_SEQUENTIAL
&& u->endfile == AFTER_ENDFILE)
{
@@ -401,6 +408,10 @@ st_rewind (st_parameter_filepos *fpp)
"Cannot REWIND a file opened for DIRECT access");
else
{
+
+ if (u->au && async_wait (&(fpp->common), u->au))
+ return;
+
/* If there are previously written bytes from a write with ADVANCE="no",
add a record marker before performing the ENDFILE. */
@@ -456,6 +467,9 @@ st_flush (st_parameter_filepos *fpp)
u = find_unit (fpp->common.unit);
if (u != NULL)
{
+ if (u->au && async_wait (&(fpp->common), u->au))
+ return;
+
/* Make sure format buffer is flushed. */
if (u->flags.form == FORM_FORMATTED)
fbuf_flush (u, u->mode);
===================================================================
@@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
/* Implement the non-IOLENGTH variant of the INQUIRY statement */
#include "io.h"
+#include "async.h"
#include "unix.h"
#include <string.h>
@@ -281,12 +282,6 @@ inquire_via_unit (st_parameter_inquire *iqp, gfc_u
{
GFC_INTEGER_4 cf2 = iqp->flags2;
- if ((cf2 & IOPARM_INQUIRE_HAS_PENDING) != 0)
- *iqp->pending = 0;
-
- if ((cf2 & IOPARM_INQUIRE_HAS_ID) != 0)
- *iqp->id = 0;
-
if ((cf2 & IOPARM_INQUIRE_HAS_ENCODING) != 0)
{
if (u == NULL || u->flags.form != FORM_FORMATTED)
@@ -347,6 +342,27 @@ inquire_via_unit (st_parameter_inquire *iqp, gfc_u
cf_strcpy (iqp->asynchronous, iqp->asynchronous_len, p);
}
+ if ((cf2 & IOPARM_INQUIRE_HAS_PENDING) != 0)
+ {
+ if (u->au == NULL)
+ *(iqp->pending) = 0;
+ else
+ {
+ LOCK (&(u->au->lock));
+ if ((cf2 & IOPARM_INQUIRE_HAS_ID) != 0)
+ {
+ int id;
+ id = *(iqp->id);
+ *(iqp->pending) = id > u->au->id.low;
+ }
+ else
+ {
+ *(iqp->pending) = ! u->au->empty;
+ }
+ UNLOCK (&(u->au->lock));
+ }
+ }
+
if ((cf2 & IOPARM_INQUIRE_HAS_SIGN) != 0)
{
if (u == NULL)
===================================================================
@@ -531,7 +531,9 @@ typedef struct st_parameter_dt
/* A flag used to identify when a non-standard expanded namelist read
has occurred. */
unsigned expanded_read : 1;
- /* 13 unused bits. */
+ /* Flag to indicate if the statement has async="YES". */
+ unsigned async : 1;
+ /* 12 unused bits. */
int child_saved_iostat;
int nml_delim;
@@ -590,7 +592,7 @@ extern char check_st_parameter_dt[sizeof (((st_par
typedef struct
{
st_parameter_common common;
- CHARACTER1 (id);
+ GFC_INTEGER_4 *id;
}
st_parameter_wait;
@@ -659,6 +661,9 @@ typedef struct gfc_unit
int continued;
+ /* contains the pointer to the async unit */
+ struct async_unit *au;
+
__gthread_mutex_t lock;
/* Number of threads waiting to acquire this unit's lock.
When non-zero, close_unit doesn't only removes the unit
@@ -817,9 +822,16 @@ internal_proto(next_record);
extern void st_wait (st_parameter_wait *);
export_proto(st_wait);
+extern void st_wait_async (st_parameter_wait *);
+export_proto(st_wait_async);
+
extern void hit_eof (st_parameter_dt *);
internal_proto(hit_eof);
+extern void transfer_array_inner (st_parameter_dt *, gfc_array_char *, int,
+ gfc_charlen_type);
+internal_proto(transfer_array_inner);
+
/* read.c */
extern void set_integer (void *, GFC_INTEGER_LARGEST, int);
@@ -988,3 +1000,10 @@ memset4 (gfc_char4_t *p, gfc_char4_t c, int k)
#endif
+extern void
+st_write_done_worker (st_parameter_dt *dtp);
+internal_proto (st_write_done_worker);
+
+extern void
+st_read_done_worker (st_parameter_dt *dtp);
+internal_proto (st_read_done_worker);
===================================================================
@@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include "io.h"
#include "fbuf.h"
#include "unix.h"
+#include "async.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
@@ -651,8 +652,12 @@ new_unit (st_parameter_open *opp, gfc_unit *u, uni
else
u->fbuf = NULL;
-
-
+ /* Check if asynchrounous. */
+ if (flags->async == ASYNC_YES)
+ u->au = init_async_unit (u);
+ else
+ u->au = NULL;
+
return u;
cleanup:
===================================================================
@@ -31,6 +31,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include "fbuf.h"
#include "format.h"
#include "unix.h"
+#include "async.h"
#include <string.h>
#include <errno.h>
@@ -184,6 +185,12 @@ static const st_option pad_opt[] = {
{NULL, 0}
};
+static const st_option async_opt[] = {
+ {"yes", ASYNC_YES},
+ {"no", ASYNC_NO},
+ {NULL, 0}
+};
+
typedef enum
{ FORMATTED_SEQUENTIAL, UNFORMATTED_SEQUENTIAL,
FORMATTED_DIRECT, UNFORMATTED_DIRECT, FORMATTED_STREAM, UNFORMATTED_STREAM
@@ -2281,7 +2288,38 @@ formatted_transfer (st_parameter_dt *dtp, bt type,
}
}
+/* Wrapper function for I/O of scalar types. If this should be an async I/O
+ request, queue it. For a synchronous write on an async unit, perform the
+ wait operation and return an error. For all synchronous writes, call the
+ right transfer function. */
+static void
+wrap_scalar_transfer (st_parameter_dt *dtp, bt type, void *p, int kind,
+ size_t size, size_t n_elem)
+{
+ if (dtp->u.p.current_unit && dtp->u.p.current_unit->au)
+ {
+ if (dtp->u.p.async)
+ {
+ transfer_args args;
+ args.scalar.transfer = dtp->u.p.transfer;
+ args.scalar.arg_bt = type;
+ args.scalar.data = p;
+ args.scalar.i = kind;
+ args.scalar.s1 = size;
+ args.scalar.s2 = n_elem;
+ enqueue_transfer(dtp->u.p.current_unit->au, &args, AIO_TRANSFER_SCALAR);
+ return;
+ }
+ }
+ /* Come here if there was no asynchronous I/O to be scheduled. */
+ if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
+ return;
+
+ dtp->u.p.transfer (dtp, type, p, kind, size, 1);
+}
+
+
/* Data transfer entry points. The type of the data entity is
implicit in the subroutine call. This prevents us from having to
share a common enum with the compiler. */
@@ -2289,9 +2327,7 @@ formatted_transfer (st_parameter_dt *dtp, bt type,
void
transfer_integer (st_parameter_dt *dtp, void *p, int kind)
{
- if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
- return;
- dtp->u.p.transfer (dtp, BT_INTEGER, p, kind, kind, 1);
+ wrap_scalar_transfer (dtp, BT_INTEGER, p, kind, kind, 1);
}
void
@@ -2307,7 +2343,7 @@ transfer_real (st_parameter_dt *dtp, void *p, int
if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
return;
size = size_from_real_kind (kind);
- dtp->u.p.transfer (dtp, BT_REAL, p, kind, size, 1);
+ wrap_scalar_transfer(dtp, BT_REAL, p, kind, size, 1);
}
void
@@ -2319,9 +2355,7 @@ transfer_real_write (st_parameter_dt *dtp, void *p
void
transfer_logical (st_parameter_dt *dtp, void *p, int kind)
{
- if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
- return;
- dtp->u.p.transfer (dtp, BT_LOGICAL, p, kind, kind, 1);
+ wrap_scalar_transfer(dtp, BT_LOGICAL, p, kind, kind, 1);
}
void
@@ -2345,7 +2379,7 @@ transfer_character (st_parameter_dt *dtp, void *p,
p = empty_string;
/* Set kind here to 1. */
- dtp->u.p.transfer (dtp, BT_CHARACTER, p, 1, len, 1);
+ wrap_scalar_transfer(dtp, BT_CHARACTER, p, 1, len, 1);
}
void
@@ -2369,7 +2403,7 @@ transfer_character_wide (st_parameter_dt *dtp, voi
p = empty_string;
/* Here we pass the actual kind value. */
- dtp->u.p.transfer (dtp, BT_CHARACTER, p, kind, len, 1);
+ wrap_scalar_transfer(dtp, BT_CHARACTER, p, kind, len, 1);
}
void
@@ -2385,7 +2419,7 @@ transfer_complex (st_parameter_dt *dtp, void *p, i
if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
return;
size = size_from_complex_kind (kind);
- dtp->u.p.transfer (dtp, BT_COMPLEX, p, kind, size, 1);
+ wrap_scalar_transfer(dtp, BT_COMPLEX, p, kind, size, 1);
}
void
@@ -2395,8 +2429,8 @@ transfer_complex_write (st_parameter_dt *dtp, void
}
void
-transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind,
- gfc_charlen_type charlen)
+transfer_array_inner (st_parameter_dt *dtp, gfc_array_char *desc, int kind,
+ gfc_charlen_type charlen)
{
index_type count[GFC_MAX_DIMENSIONS];
index_type extent[GFC_MAX_DIMENSIONS];
@@ -2471,6 +2505,34 @@ void
}
void
+transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind,
+ gfc_charlen_type charlen)
+{
+ if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
+ return;
+
+ if (dtp->u.p.current_unit && dtp->u.p.current_unit->au)
+ {
+ if (dtp->u.p.async)
+ {
+ transfer_args args;
+ size_t sz = sizeof (gfc_array_char)
+ + sizeof (descriptor_dimension) * GFC_DESCRIPTOR_RANK (desc);
+ args.array.desc = xmalloc (sz);
+ NOTE("desc = %p", (void *) args.array.desc);
+ memcpy (args.array.desc, desc, sz);
+ args.array.kind = kind;
+ args.array.charlen = charlen;
+ enqueue_transfer(dtp->u.p.current_unit->au, &args, AIO_TRANSFER_ARRAY);
+ return;
+ }
+ }
+ /* Come here if there was no asynchronous I/O to be scheduled. */
+ transfer_array_inner (dtp, desc, kind, charlen);
+}
+
+
+void
transfer_array_write (st_parameter_dt *dtp, gfc_array_char *desc, int kind,
gfc_charlen_type charlen)
{
@@ -2492,7 +2554,7 @@ transfer_derived (st_parameter_dt *parent, void *d
else
parent->u.p.fdtio_ptr = (formatted_dtio) dtio_proc;
}
- parent->u.p.transfer (parent, BT_CLASS, dtio_source, 0, 0, 1);
+ wrap_scalar_transfer(parent, BT_CLASS, dtio_source, 0, 0, 1);
}
@@ -2770,6 +2832,29 @@ data_transfer_init (st_parameter_dt *dtp, int read
else if (dtp->u.p.current_unit->internal_unit_kind > 0)
dtp->u.p.unit_is_internal = 1;
+ if ((cf & IOPARM_DT_HAS_ASYNCHRONOUS) != 0)
+ {
+ int f;
+ f = find_option (&dtp->common, dtp->asynchronous, dtp->asynchronous_len,
+ async_opt, "Bad ASYNCHRONOUS in data transfer statement");
+ if (f == ASYNC_YES && dtp->u.p.current_unit->flags.async != ASYNC_YES)
+ {
+ generate_error (&dtp->common, LIBERROR_OPTION_CONFLICT,
+ "ASYNCHRONOUS transfer without ASYHCRONOUS='YES' in OPEN");
+ return;
+ }
+ dtp->u.p.async = f == ASYNC_YES;
+ }
+
+ /* Perform a wait operation for any pending asynchronous I/O. This needs to
+ be done before all other error checks. See F2008, 9.6.4.1. */
+
+ if (dtp->u.p.current_unit->au && !dtp->u.p.async)
+ {
+ if (async_wait (&(dtp->common), dtp->u.p.current_unit->au))
+ return;
+ }
+
/* Check the action. */
if (read_flag && dtp->u.p.current_unit->flags.action == ACTION_WRITE)
@@ -3184,6 +3269,9 @@ data_transfer_init (st_parameter_dt *dtp, int read
dtp->u.p.current_unit->read_bad = 1;
}
+ if (dtp->u.p.current_unit->au)
+ change_pdt(dtp->u.p.current_unit->au, dtp);
+
if (dtp->u.p.current_unit->flags.form == FORM_FORMATTED)
{
#ifdef HAVE_USELOCALE
@@ -4099,7 +4187,7 @@ extern void st_read_done (st_parameter_dt *);
export_proto(st_read_done);
void
-st_read_done (st_parameter_dt *dtp)
+st_read_done_worker (st_parameter_dt *dtp)
{
finalize_transfer (dtp);
@@ -4127,6 +4215,24 @@ void
free_format_data (dtp->u.p.fmt);
free_format (dtp);
}
+ }
+}
+
+void
+st_read_done(st_parameter_dt *dtp)
+{
+ if (dtp->u.p.current_unit)
+ {
+ if (dtp->u.p.current_unit->au)
+ {
+ if (dtp->common.flags & IOPARM_DT_HAS_ID)
+ *dtp->id = enqueue_done_id (dtp->u.p.current_unit->au, AIO_READ_DONE);
+ else
+ enqueue_done (dtp->u.p.current_unit->au, AIO_READ_DONE);
+ }
+ else
+ st_read_done_worker (dtp);
+
unlock_unit (dtp->u.p.current_unit);
}
@@ -4143,11 +4249,9 @@ st_write (st_parameter_dt *dtp)
data_transfer_init (dtp, 0);
}
-extern void st_write_done (st_parameter_dt *);
-export_proto(st_write_done);
void
-st_write_done (st_parameter_dt *dtp)
+st_write_done_worker (st_parameter_dt *dtp)
{
finalize_transfer (dtp);
@@ -4196,19 +4300,61 @@ void
free_format_data (dtp->u.p.fmt);
free_format (dtp);
}
+ }
+}
+
+extern void st_write_done (st_parameter_dt *);
+export_proto(st_write_done);
+
+void
+st_write_done (st_parameter_dt *dtp)
+{
+ if (dtp->u.p.current_unit)
+ {
+ if (dtp->u.p.current_unit->au)
+ {
+ if (dtp->common.flags & IOPARM_DT_HAS_ID)
+ *dtp->id = enqueue_done_id(dtp->u.p.current_unit->au, AIO_WRITE_DONE);
+ else
+ enqueue_done (dtp->u.p.current_unit->au, AIO_WRITE_DONE);
+ }
+ else
+ st_write_done_worker (dtp);
+
unlock_unit (dtp->u.p.current_unit);
}
+
library_end ();
}
+/* Wait operation. We need to keep around the do-nothing version
+ of st_wait for compatibility with previous versions, which had marked
+ the argument as unused (and thus liable to be removed).
-/* F2003: This is a stub for the runtime portion of the WAIT statement. */
+ TODO: remove at next bump in version number. */
+
void
st_wait (st_parameter_wait *wtp __attribute__((unused)))
{
+ return;
}
+void
+st_wait_async (st_parameter_wait *wtp)
+{
+ gfc_unit *u = find_unit(wtp->common.unit);
+ if (u->au)
+ {
+ if (wtp->common.flags & IOPARM_WAIT_HAS_ID)
+ async_wait_id (&(wtp->common), u->au, *wtp->id);
+ else
+ async_wait (&(wtp->common), u->au);
+ }
+ unlock_unit(u);
+}
+
+
/* Receives the scalar information for namelist objects and stores it
in a linked list of namelist_info types. */
===================================================================
@@ -27,6 +27,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include "fbuf.h"
#include "format.h"
#include "unix.h"
+#include "async.h"
#include <string.h>
#include <assert.h>
@@ -240,7 +241,7 @@ insert_unit (int n)
#else
__GTHREAD_MUTEX_INIT_FUNCTION (&u->lock);
#endif
- __gthread_mutex_lock (&u->lock);
+ LOCK (&u->lock);
u->priority = pseudo_random ();
unit_root = insert (u, unit_root);
return u;
@@ -327,7 +328,9 @@ get_gfc_unit (int n, int do_create)
gfc_unit *p;
int c, created = 0;
- __gthread_mutex_lock (&unit_lock);
+ NOTE("Unit n=%d, do_create = %d", n, do_create);
+ LOCK(&unit_lock);
+
retry:
for (c = 0; c < CACHE_SIZE; c++)
if (unit_cache[c] != NULL && unit_cache[c]->unit_number == n)
@@ -366,7 +369,7 @@ retry:
{
/* Newly created units have their lock held already
from insert_unit. Just unlock UNIT_LOCK and return. */
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK(&unit_lock);
return p;
}
@@ -374,10 +377,10 @@ found:
if (p != NULL && (p->child_dtio == 0))
{
/* Fast path. */
- if (! __gthread_mutex_trylock (&p->lock))
+ if (! TRYLOCK (&p->lock))
{
/* assert (p->closed == 0); */
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK(&unit_lock);
return p;
}
@@ -385,15 +388,15 @@ found:
}
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
if (p != NULL && (p->child_dtio == 0))
{
- __gthread_mutex_lock (&p->lock);
+ LOCK (&p->lock);
if (p->closed)
{
- __gthread_mutex_lock (&unit_lock);
- __gthread_mutex_unlock (&p->lock);
+ LOCK (&unit_lock);
+ UNLOCK (&p->lock);
if (predec_waiting_locked (p) == 0)
destroy_unit_mutex (p);
goto retry;
@@ -640,7 +643,7 @@ init_units (void)
fbuf_init (u, 0);
- __gthread_mutex_unlock (&u->lock);
+ UNLOCK (&u->lock);
}
if (options.stdout_unit >= 0)
@@ -671,7 +674,7 @@ init_units (void)
fbuf_init (u, 0);
- __gthread_mutex_unlock (&u->lock);
+ UNLOCK (&u->lock);
}
if (options.stderr_unit >= 0)
@@ -702,13 +705,13 @@ init_units (void)
fbuf_init (u, 256); /* 256 bytes should be enough, probably not doing
any kind of exotic formatting to stderr. */
- __gthread_mutex_unlock (&u->lock);
+ UNLOCK (&u->lock);
}
/* The default internal units. */
u = insert_unit (GFC_INTERNAL_UNIT);
- __gthread_mutex_unlock (&u->lock);
+ UNLOCK (&u->lock);
u = insert_unit (GFC_INTERNAL_UNIT4);
- __gthread_mutex_unlock (&u->lock);
+ UNLOCK (&u->lock);
}
@@ -717,6 +720,9 @@ close_unit_1 (gfc_unit *u, int locked)
{
int i, rc;
+ if(u->au)
+ async_close (u->au);
+
/* If there are previously written bytes from a write with ADVANCE="no"
Reposition the buffer before closing. */
if (u->previous_nonadvancing_write)
@@ -723,10 +729,10 @@ close_unit_1 (gfc_unit *u, int locked)
finish_last_advance_record (u);
rc = (u->s == NULL) ? 0 : sclose (u->s) == -1;
-
+
u->closed = 1;
if (!locked)
- __gthread_mutex_lock (&unit_lock);
+ LOCK (&unit_lock);
for (i = 0; i < CACHE_SIZE; i++)
if (unit_cache[i] == u)
@@ -744,7 +750,7 @@ close_unit_1 (gfc_unit *u, int locked)
newunit_free (u->unit_number);
if (!locked)
- __gthread_mutex_unlock (&u->lock);
+ UNLOCK (&u->lock);
/* If there are any threads waiting in find_unit for this unit,
avoid freeing the memory, the last such thread will free it
@@ -753,7 +759,7 @@ close_unit_1 (gfc_unit *u, int locked)
destroy_unit_mutex (u);
if (!locked)
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
return rc;
}
@@ -761,7 +767,9 @@ close_unit_1 (gfc_unit *u, int locked)
void
unlock_unit (gfc_unit *u)
{
- __gthread_mutex_unlock (&u->lock);
+ NOTE ("unlock_unit = %d", u->unit_number);
+ UNLOCK (&u->lock);
+ NOTE ("unlock_unit done");
}
/* close_unit()-- Close a unit. The stream is closed, and any memory
@@ -785,10 +793,10 @@ close_unit (gfc_unit *u)
void
close_units (void)
{
- __gthread_mutex_lock (&unit_lock);
+ LOCK (&unit_lock);
while (unit_root != NULL)
close_unit_1 (unit_root, 1);
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
free (newunits);
@@ -895,7 +903,7 @@ finish_last_advance_record (gfc_unit *u)
int
newunit_alloc (void)
{
- __gthread_mutex_lock (&unit_lock);
+ LOCK (&unit_lock);
if (!newunits)
{
newunits = xcalloc (16, 1);
@@ -909,7 +917,7 @@ newunit_alloc (void)
{
newunits[ii] = true;
newunit_lwi = ii + 1;
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
return -ii + NEWUNIT_START;
}
}
@@ -922,7 +930,7 @@ newunit_alloc (void)
memset (newunits + old_size, 0, old_size);
newunits[old_size] = true;
newunit_lwi = old_size + 1;
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
return -old_size + NEWUNIT_START;
}
===================================================================
@@ -27,6 +27,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include "io.h"
#include "unix.h"
+#include "async.h"
#include <limits.h>
#ifdef HAVE_UNISTD_H
@@ -1742,7 +1743,7 @@ find_file (const char *file, gfc_charlen_type file
id = id_from_path (path);
#endif
- __gthread_mutex_lock (&unit_lock);
+ LOCK (&unit_lock);
retry:
u = find_file0 (unit_root, FIND_FILE0_ARGS);
if (u != NULL)
@@ -1751,20 +1752,20 @@ retry:
if (! __gthread_mutex_trylock (&u->lock))
{
/* assert (u->closed == 0); */
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
goto done;
}
inc_waiting_locked (u);
}
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
if (u != NULL)
{
- __gthread_mutex_lock (&u->lock);
+ LOCK (&u->lock);
if (u->closed)
{
- __gthread_mutex_lock (&unit_lock);
- __gthread_mutex_unlock (&u->lock);
+ LOCK (&unit_lock);
+ UNLOCK (&u->lock);
if (predec_waiting_locked (u) == 0)
free (u);
goto retry;
@@ -1794,7 +1795,7 @@ flush_all_units_1 (gfc_unit *u, int min_unit)
return u;
if (u->s)
sflush (u->s);
- __gthread_mutex_unlock (&u->lock);
+ UNLOCK (&u->lock);
}
u = u->right;
}
@@ -1807,17 +1808,17 @@ flush_all_units (void)
gfc_unit *u;
int min_unit = 0;
- __gthread_mutex_lock (&unit_lock);
+ LOCK (&unit_lock);
do
{
u = flush_all_units_1 (unit_root, min_unit);
if (u != NULL)
inc_waiting_locked (u);
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
if (u == NULL)
return;
- __gthread_mutex_lock (&u->lock);
+ LOCK (&u->lock);
min_unit = u->unit_number + 1;
@@ -1824,14 +1825,14 @@ flush_all_units (void)
if (u->closed == 0)
{
sflush (u->s);
- __gthread_mutex_lock (&unit_lock);
- __gthread_mutex_unlock (&u->lock);
+ LOCK (&unit_lock);
+ UNLOCK (&u->lock);
(void) predec_waiting_locked (u);
}
else
{
- __gthread_mutex_lock (&unit_lock);
- __gthread_mutex_unlock (&u->lock);
+ LOCK (&unit_lock);
+ UNLOCK (&u->lock);
if (predec_waiting_locked (u) == 0)
free (u);
}
===================================================================
@@ -743,6 +743,9 @@ internal_proto(translate_error);
extern void generate_error (st_parameter_common *, int, const char *);
iexport_proto(generate_error);
+extern bool generate_error_common (st_parameter_common *, int, const char *);
+iexport_proto(generate_error_common);
+
extern void generate_warning (st_parameter_common *, const char *);
internal_proto(generate_warning);
@@ -1748,5 +1751,7 @@ void cshift1_16_c16 (gfc_array_c16 * const restric
internal_proto(cshift1_16_c16);
#endif
+/* Define this if we support asynchronous I/O on this platform. This
+ currently requires weak symbols. */
#endif /* LIBGFOR_H */
===================================================================
@@ -24,6 +24,9 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include "libgfortran.h"
+#include "io.h"
+#include "async.h"
+
#include <assert.h>
#include <string.h>
#include <errno.h>
@@ -526,24 +529,30 @@ translate_error (int code)
}
-/* generate_error()-- Come here when an error happens. This
- * subroutine is called if it is possible to continue on after the error.
- * If an IOSTAT or IOMSG variable exists, we set it. If IOSTAT or
- * ERR labels are present, we return, otherwise we terminate the program
- * after printing a message. The error code is always required but the
- * message parameter can be NULL, in which case a string describing
- * the most recent operating system error is used. */
+/* Worker function for generate_error and generate_error_async. Return true
+ if a straight return is to be done, zero if the program should abort. */
-void
-generate_error (st_parameter_common *cmp, int family, const char *message)
+bool
+generate_error_common (st_parameter_common *cmp, int family, const char *message)
{
char errmsg[STRERR_MAXSZ];
+ gfc_unit *u;
+ u = thread_unit;
+ if (u && u->au && __gthread_equal (u->au->thread, __gthread_self ()))
+ {
+ u->au->error.has_error = 1;
+ u->au->error.cmp = cmp;
+ u->au->error.family = family;
+ u->au->error.message = message;
+ return true;
+ }
+
/* If there was a previous error, don't mask it with another
error message, EOF or EOR condition. */
if ((cmp->flags & IOPARM_LIBRETURN_MASK) == IOPARM_LIBRETURN_ERROR)
- return;
+ return true;
/* Set the error status. */
if ((cmp->flags & IOPARM_HAS_IOSTAT))
@@ -564,27 +573,28 @@ translate_error (int code)
case LIBERROR_EOR:
cmp->flags |= IOPARM_LIBRETURN_EOR;
if ((cmp->flags & IOPARM_EOR))
- return;
+ return true;
break;
case LIBERROR_END:
cmp->flags |= IOPARM_LIBRETURN_END;
if ((cmp->flags & IOPARM_END))
- return;
+ return true;
break;
default:
cmp->flags |= IOPARM_LIBRETURN_ERROR;
if ((cmp->flags & IOPARM_ERR))
- return;
+ return true;
break;
}
/* Return if the user supplied an iostat variable. */
if ((cmp->flags & IOPARM_HAS_IOSTAT))
- return;
+ return true;
- /* Terminate the program */
+ /* Return code, caller is responsible for terminating
+ the program if necessary. */
recursion_check ();
show_locus (cmp);
@@ -591,8 +601,27 @@ translate_error (int code)
estr_write ("Fortran runtime error: ");
estr_write (message);
estr_write ("\n");
- exit_error (2);
+ return false;
}
+
+/* generate_error()-- Come here when an error happens. This
+ * subroutine is called if it is possible to continue on after the error.
+ * If an IOSTAT or IOMSG variable exists, we set it. If IOSTAT or
+ * ERR labels are present, we return, otherwise we terminate the program
+ * after printing a message. The error code is always required but the
+ * message parameter can be NULL, in which case a string describing
+ * the most recent operating system error is used.
+ * If the error is for an asynchronous unit and if the program is currently
+ * executing the asynchronous thread, just mark the error and return. */
+
+void
+generate_error (st_parameter_common *cmp, int family, const char *message)
+{
+ if (generate_error_common (cmp, family, message))
+ return;
+
+ exit_error(2);
+}
iexport(generate_error);