FreeBSD kernel kern code
kern_alq.c
Go to the documentation of this file.
1 /*-
2  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
3  * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
4  * Copyright (c) 2009-2010, The FreeBSD Foundation
5  * All rights reserved.
6  *
7  * Portions of this software were developed at the Centre for Advanced
8  * Internet Architectures, Swinburne University of Technology, Melbourne,
9  * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  * 1. Redistributions of source code must retain the above copyright
15  * notice unmodified, this list of conditions, and the following
16  * disclaimer.
17  * 2. Redistributions in binary form must reproduce the above copyright
18  * notice, this list of conditions and the following disclaimer in the
19  * documentation and/or other materials provided with the distribution.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
22  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
23  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
24  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
26  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
30  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <sys/cdefs.h>
34 __FBSDID("$BSDSUniX$");
35 
36 #include "opt_mac.h"
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/kthread.h>
42 #include <sys/lock.h>
43 #include <sys/mount.h>
44 #include <sys/mutex.h>
45 #include <sys/namei.h>
46 #include <sys/proc.h>
47 #include <sys/vnode.h>
48 #include <sys/alq.h>
49 #include <sys/malloc.h>
50 #include <sys/unistd.h>
51 #include <sys/fcntl.h>
52 #include <sys/eventhandler.h>
53 
54 #include <security/mac/mac_framework.h>
55 
56 /* Async. Logging Queue */
57 struct alq {
58  char *aq_entbuf; /* Buffer for stored entries */
59  int aq_entmax; /* Max entries */
60  int aq_entlen; /* Entry length */
61  int aq_freebytes; /* Bytes available in buffer */
62  int aq_buflen; /* Total length of our buffer */
63  int aq_writehead; /* Location for next write */
64  int aq_writetail; /* Flush starts at this location */
65  int aq_wrapearly; /* # bytes left blank at end of buf */
66  int aq_flags; /* Queue flags */
67  int aq_waiters; /* Num threads waiting for resources
68  * NB: Used as a wait channel so must
69  * not be first field in the alq struct
70  */
71  struct ale aq_getpost; /* ALE for use by get/post */
72  struct mtx aq_mtx; /* Queue lock */
73  struct vnode *aq_vp; /* Open vnode handle */
74  struct ucred *aq_cred; /* Credentials of the opening thread */
75  LIST_ENTRY(alq) aq_act; /* List of active queues */
76  LIST_ENTRY(alq) aq_link; /* List of all queues */
77 };
78 
79 #define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */
80 #define AQ_ACTIVE 0x0002 /* on the active list */
81 #define AQ_FLUSHING 0x0004 /* doing IO */
82 #define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */
83 #define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */
84 #define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */
85 
86 #define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx)
87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
88 
89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
90 
91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
92 
93 /*
94  * The ald_mtx protects the ald_queues list and the ald_active list.
95  */
96 static struct mtx ald_mtx;
97 static LIST_HEAD(, alq) ald_queues;
98 static LIST_HEAD(, alq) ald_active;
99 static int ald_shutingdown = 0;
100 struct thread *ald_thread;
101 static struct proc *ald_proc;
102 static eventhandler_tag alq_eventhandler_tag = NULL;
103 
104 #define ALD_LOCK() mtx_lock(&ald_mtx)
105 #define ALD_UNLOCK() mtx_unlock(&ald_mtx)
106 
107 /* Daemon functions */
108 static int ald_add(struct alq *);
109 static int ald_rem(struct alq *);
110 static void ald_startup(void *);
111 static void ald_daemon(void);
112 static void ald_shutdown(void *, int);
113 static void ald_activate(struct alq *);
114 static void ald_deactivate(struct alq *);
115 
116 /* Internal queue functions */
117 static void alq_shutdown(struct alq *);
118 static void alq_destroy(struct alq *);
119 static int alq_doio(struct alq *);
120 
121 
122 /*
123  * Add a new queue to the global list. Fail if we're shutting down.
124  */
125 static int
126 ald_add(struct alq *alq)
127 {
128  int error;
129 
130  error = 0;
131 
132  ALD_LOCK();
133  if (ald_shutingdown) {
134  error = EBUSY;
135  goto done;
136  }
137  LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
138 done:
139  ALD_UNLOCK();
140  return (error);
141 }
142 
143 /*
144  * Remove a queue from the global list unless we're shutting down. If so,
145  * the ald will take care of cleaning up it's resources.
146  */
147 static int
148 ald_rem(struct alq *alq)
149 {
150  int error;
151 
152  error = 0;
153 
154  ALD_LOCK();
155  if (ald_shutingdown) {
156  error = EBUSY;
157  goto done;
158  }
159  LIST_REMOVE(alq, aq_link);
160 done:
161  ALD_UNLOCK();
162  return (error);
163 }
164 
165 /*
166  * Put a queue on the active list. This will schedule it for writing.
167  */
168 static void
169 ald_activate(struct alq *alq)
170 {
171  LIST_INSERT_HEAD(&ald_active, alq, aq_act);
172  wakeup(&ald_active);
173 }
174 
175 static void
176 ald_deactivate(struct alq *alq)
177 {
178  LIST_REMOVE(alq, aq_act);
179  alq->aq_flags &= ~AQ_ACTIVE;
180 }
181 
182 static void
183 ald_startup(void *unused)
184 {
185  mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
186  LIST_INIT(&ald_queues);
187  LIST_INIT(&ald_active);
188 }
189 
190 static void
192 {
193  int needwakeup;
194  struct alq *alq;
195 
196  ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
197 
198  alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
199  ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
200 
201  ALD_LOCK();
202 
203  for (;;) {
204  while ((alq = LIST_FIRST(&ald_active)) == NULL &&
205  !ald_shutingdown)
206  mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
207 
208  /* Don't shutdown until all active ALQs are flushed. */
209  if (ald_shutingdown && alq == NULL) {
210  ALD_UNLOCK();
211  break;
212  }
213 
214  ALQ_LOCK(alq);
215  ald_deactivate(alq);
216  ALD_UNLOCK();
217  needwakeup = alq_doio(alq);
218  ALQ_UNLOCK(alq);
219  if (needwakeup)
220  wakeup_one(alq);
221  ALD_LOCK();
222  }
223 
224  kproc_exit(0);
225 }
226 
227 static void
228 ald_shutdown(void *arg, int howto)
229 {
230  struct alq *alq;
231 
232  ALD_LOCK();
233 
234  /* Ensure no new queues can be created. */
235  ald_shutingdown = 1;
236 
237  /* Shutdown all ALQs prior to terminating the ald_daemon. */
238  while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
239  LIST_REMOVE(alq, aq_link);
240  ALD_UNLOCK();
241  alq_shutdown(alq);
242  ALD_LOCK();
243  }
244 
245  /* At this point, all ALQs are flushed and shutdown. */
246 
247  /*
248  * Wake ald_daemon so that it exits. It won't be able to do
249  * anything until we mtx_sleep because we hold the ald_mtx.
250  */
251  wakeup(&ald_active);
252 
253  /* Wait for ald_daemon to exit. */
254  mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
255 
256  ALD_UNLOCK();
257 }
258 
259 static void
260 alq_shutdown(struct alq *alq)
261 {
262  ALQ_LOCK(alq);
263 
264  /* Stop any new writers. */
265  alq->aq_flags |= AQ_SHUTDOWN;
266 
267  /*
268  * If the ALQ isn't active but has unwritten data (possible if
269  * the ALQ_NOACTIVATE flag has been used), explicitly activate the
270  * ALQ here so that the pending data gets flushed by the ald_daemon.
271  */
272  if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
273  alq->aq_flags |= AQ_ACTIVE;
274  ALQ_UNLOCK(alq);
275  ALD_LOCK();
276  ald_activate(alq);
277  ALD_UNLOCK();
278  ALQ_LOCK(alq);
279  }
280 
281  /* Drain IO */
282  while (alq->aq_flags & AQ_ACTIVE) {
283  alq->aq_flags |= AQ_WANTED;
284  msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
285  }
286  ALQ_UNLOCK(alq);
287 
288  vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
289  curthread);
290  crfree(alq->aq_cred);
291 }
292 
293 void
294 alq_destroy(struct alq *alq)
295 {
296  /* Drain all pending IO. */
297  alq_shutdown(alq);
298 
299  mtx_destroy(&alq->aq_mtx);
300  free(alq->aq_entbuf, M_ALD);
301  free(alq, M_ALD);
302 }
303 
304 /*
305  * Flush all pending data to disk. This operation will block.
306  */
307 static int
308 alq_doio(struct alq *alq)
309 {
310  struct thread *td;
311  struct mount *mp;
312  struct vnode *vp;
313  struct uio auio;
314  struct iovec aiov[2];
315  int totlen;
316  int iov;
317  int vfslocked;
318  int wrapearly;
319 
320  KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
321 
322  vp = alq->aq_vp;
323  td = curthread;
324  totlen = 0;
325  iov = 1;
326  wrapearly = alq->aq_wrapearly;
327 
328  bzero(&aiov, sizeof(aiov));
329  bzero(&auio, sizeof(auio));
330 
331  /* Start the write from the location of our buffer tail pointer. */
332  aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
333 
334  if (alq->aq_writetail < alq->aq_writehead) {
335  /* Buffer not wrapped. */
336  totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
337  } else if (alq->aq_writehead == 0) {
338  /* Buffer not wrapped (special case to avoid an empty iov). */
339  totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
340  wrapearly;
341  } else {
342  /*
343  * Buffer wrapped, requires 2 aiov entries:
344  * - first is from writetail to end of buffer
345  * - second is from start of buffer to writehead
346  */
347  aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
348  wrapearly;
349  iov++;
350  aiov[1].iov_base = alq->aq_entbuf;
351  aiov[1].iov_len = alq->aq_writehead;
352  totlen = aiov[0].iov_len + aiov[1].iov_len;
353  }
354 
355  alq->aq_flags |= AQ_FLUSHING;
356  ALQ_UNLOCK(alq);
357 
358  auio.uio_iov = &aiov[0];
359  auio.uio_offset = 0;
360  auio.uio_segflg = UIO_SYSSPACE;
361  auio.uio_rw = UIO_WRITE;
362  auio.uio_iovcnt = iov;
363  auio.uio_resid = totlen;
364  auio.uio_td = td;
365 
366  /*
367  * Do all of the junk required to write now.
368  */
369  vfslocked = VFS_LOCK_GIANT(vp->v_mount);
370  vn_start_write(vp, &mp, V_WAIT);
371  vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
372  /*
373  * XXX: VOP_WRITE error checks are ignored.
374  */
375 #ifdef MAC
376  if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
377 #endif
378  VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
379  VOP_UNLOCK(vp, 0);
380  vn_finished_write(mp);
381  VFS_UNLOCK_GIANT(vfslocked);
382 
383  ALQ_LOCK(alq);
384  alq->aq_flags &= ~AQ_FLUSHING;
385 
386  /* Adjust writetail as required, taking into account wrapping. */
387  alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
388  alq->aq_buflen;
389  alq->aq_freebytes += totlen + wrapearly;
390 
391  /*
392  * If we just flushed part of the buffer which wrapped, reset the
393  * wrapearly indicator.
394  */
395  if (wrapearly)
396  alq->aq_wrapearly = 0;
397 
398  /*
399  * If we just flushed the buffer completely, reset indexes to 0 to
400  * minimise buffer wraps.
401  * This is also required to ensure alq_getn() can't wedge itself.
402  */
403  if (!HAS_PENDING_DATA(alq))
404  alq->aq_writehead = alq->aq_writetail = 0;
405 
406  KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
407  ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
408 
409  if (alq->aq_flags & AQ_WANTED) {
410  alq->aq_flags &= ~AQ_WANTED;
411  return (1);
412  }
413 
414  return(0);
415 }
416 
417 static struct kproc_desc ald_kp = {
418  "ALQ Daemon",
419  ald_daemon,
420  &ald_proc
421 };
422 
423 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
424 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
425 
426 
427 /* User visible queue functions */
428 
429 /*
430  * Create the queue data structure, allocate the buffer, and open the file.
431  */
432 
433 int
434 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
435  int size, int flags)
436 {
437  struct thread *td;
438  struct nameidata nd;
439  struct alq *alq;
440  int oflags;
441  int error;
442  int vfslocked;
443 
444  KASSERT((size > 0), ("%s: size <= 0", __func__));
445 
446  *alqp = NULL;
447  td = curthread;
448 
449  NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
450  oflags = FWRITE | O_NOFOLLOW | O_CREAT;
451 
452  error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
453  if (error)
454  return (error);
455 
456  vfslocked = NDHASGIANT(&nd);
457  NDFREE(&nd, NDF_ONLY_PNBUF);
458  /* We just unlock so we hold a reference */
459  VOP_UNLOCK(nd.ni_vp, 0);
460  VFS_UNLOCK_GIANT(vfslocked);
461 
462  alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
463  alq->aq_vp = nd.ni_vp;
464  alq->aq_cred = crhold(cred);
465 
466  mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
467 
468  alq->aq_buflen = size;
469  alq->aq_entmax = 0;
470  alq->aq_entlen = 0;
471 
472  alq->aq_freebytes = alq->aq_buflen;
473  alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
474  alq->aq_writehead = alq->aq_writetail = 0;
475  if (flags & ALQ_ORDERED)
476  alq->aq_flags |= AQ_ORDERED;
477 
478  if ((error = ald_add(alq)) != 0) {
479  alq_destroy(alq);
480  return (error);
481  }
482 
483  *alqp = alq;
484 
485  return (0);
486 }
487 
488 int
489 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
490  int size, int count)
491 {
492  int ret;
493 
494  KASSERT((count >= 0), ("%s: count < 0", __func__));
495 
496  if (count > 0) {
497  if ((ret = alq_open_flags(alqp, file, cred, cmode,
498  size*count, 0)) == 0) {
499  (*alqp)->aq_flags |= AQ_LEGACY;
500  (*alqp)->aq_entmax = count;
501  (*alqp)->aq_entlen = size;
502  }
503  } else
504  ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
505 
506  return (ret);
507 }
508 
509 
510 /*
511  * Copy a new entry into the queue. If the operation would block either
512  * wait or return an error depending on the value of waitok.
513  */
514 int
515 alq_writen(struct alq *alq, void *data, int len, int flags)
516 {
517  int activate, copy, ret;
518  void *waitchan;
519 
520  KASSERT((len > 0 && len <= alq->aq_buflen),
521  ("%s: len <= 0 || len > aq_buflen", __func__));
522 
523  activate = ret = 0;
524  copy = len;
525  waitchan = NULL;
526 
527  ALQ_LOCK(alq);
528 
529  /*
530  * Fail to perform the write and return EWOULDBLOCK if:
531  * - The message is larger than our underlying buffer.
532  * - The ALQ is being shutdown.
533  * - There is insufficient free space in our underlying buffer
534  * to accept the message and the user can't wait for space.
535  * - There is insufficient free space in our underlying buffer
536  * to accept the message and the alq is inactive due to prior
537  * use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
538  */
539  if (len > alq->aq_buflen ||
540  alq->aq_flags & AQ_SHUTDOWN ||
541  (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
542  HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
543  ALQ_UNLOCK(alq);
544  return (EWOULDBLOCK);
545  }
546 
547  /*
548  * If we want ordered writes and there is already at least one thread
549  * waiting for resources to become available, sleep until we're woken.
550  */
551  if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
552  KASSERT(!(flags & ALQ_NOWAIT),
553  ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
554  alq->aq_waiters++;
555  msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
556  alq->aq_waiters--;
557  }
558 
559  /*
560  * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
561  * enter while loop and sleep until we have enough free bytes (former)
562  * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
563  * be in this loop. Otherwise, multiple threads may be sleeping here
564  * competing for ALQ resources.
565  */
566  while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
567  KASSERT(!(flags & ALQ_NOWAIT),
568  ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
569  alq->aq_flags |= AQ_WANTED;
570  alq->aq_waiters++;
571  if (waitchan)
572  wakeup(waitchan);
573  msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
574  alq->aq_waiters--;
575 
576  /*
577  * If we're the first thread to wake after an AQ_WANTED wakeup
578  * but there isn't enough free space for us, we're going to loop
579  * and sleep again. If there are other threads waiting in this
580  * loop, schedule a wakeup so that they can see if the space
581  * they require is available.
582  */
583  if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
584  alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
585  waitchan = alq;
586  else
587  waitchan = NULL;
588  }
589 
590  /*
591  * If there are waiters, we need to signal the waiting threads after we
592  * complete our work. The alq ptr is used as a wait channel for threads
593  * requiring resources to be freed up. In the AQ_ORDERED case, threads
594  * are not allowed to concurrently compete for resources in the above
595  * while loop, so we use a different wait channel in this case.
596  */
597  if (alq->aq_waiters > 0) {
598  if (alq->aq_flags & AQ_ORDERED)
599  waitchan = &alq->aq_waiters;
600  else
601  waitchan = alq;
602  } else
603  waitchan = NULL;
604 
605  /* Bail if we're shutting down. */
606  if (alq->aq_flags & AQ_SHUTDOWN) {
607  ret = EWOULDBLOCK;
608  goto unlock;
609  }
610 
611  /*
612  * If we need to wrap the buffer to accommodate the write,
613  * we'll need 2 calls to bcopy.
614  */
615  if ((alq->aq_buflen - alq->aq_writehead) < len)
616  copy = alq->aq_buflen - alq->aq_writehead;
617 
618  /* Copy message (or part thereof if wrap required) to the buffer. */
619  bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
620  alq->aq_writehead += copy;
621 
622  if (alq->aq_writehead >= alq->aq_buflen) {
623  KASSERT((alq->aq_writehead == alq->aq_buflen),
624  ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
625  __func__,
626  alq->aq_writehead,
627  alq->aq_buflen));
628  alq->aq_writehead = 0;
629  }
630 
631  if (copy != len) {
632  /*
633  * Wrap the buffer by copying the remainder of our message
634  * to the start of the buffer and resetting aq_writehead.
635  */
636  bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
637  alq->aq_writehead = len - copy;
638  }
639 
640  KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
641  ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
642 
643  alq->aq_freebytes -= len;
644 
645  if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
646  alq->aq_flags |= AQ_ACTIVE;
647  activate = 1;
648  }
649 
650  KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
651 
652 unlock:
653  ALQ_UNLOCK(alq);
654 
655  if (activate) {
656  ALD_LOCK();
657  ald_activate(alq);
658  ALD_UNLOCK();
659  }
660 
661  /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
662  if (waitchan != NULL)
663  wakeup_one(waitchan);
664 
665  return (ret);
666 }
667 
668 int
669 alq_write(struct alq *alq, void *data, int flags)
670 {
671  /* Should only be called in fixed length message (legacy) mode. */
672  KASSERT((alq->aq_flags & AQ_LEGACY),
673  ("%s: fixed length write on variable length queue", __func__));
674  return (alq_writen(alq, data, alq->aq_entlen, flags));
675 }
676 
677 /*
678  * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
679  */
680 struct ale *
681 alq_getn(struct alq *alq, int len, int flags)
682 {
683  int contigbytes;
684  void *waitchan;
685 
686  KASSERT((len > 0 && len <= alq->aq_buflen),
687  ("%s: len <= 0 || len > alq->aq_buflen", __func__));
688 
689  waitchan = NULL;
690 
691  ALQ_LOCK(alq);
692 
693  /*
694  * Determine the number of free contiguous bytes.
695  * We ensure elsewhere that if aq_writehead == aq_writetail because
696  * the buffer is empty, they will both be set to 0 and therefore
697  * aq_freebytes == aq_buflen and is fully contiguous.
698  * If they are equal and the buffer is not empty, aq_freebytes will
699  * be 0 indicating the buffer is full.
700  */
701  if (alq->aq_writehead <= alq->aq_writetail)
702  contigbytes = alq->aq_freebytes;
703  else {
704  contigbytes = alq->aq_buflen - alq->aq_writehead;
705 
706  if (contigbytes < len) {
707  /*
708  * Insufficient space at end of buffer to handle a
709  * contiguous write. Wrap early if there's space at
710  * the beginning. This will leave a hole at the end
711  * of the buffer which we will have to skip over when
712  * flushing the buffer to disk.
713  */
714  if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
715  /* Keep track of # bytes left blank. */
716  alq->aq_wrapearly = contigbytes;
717  /* Do the wrap and adjust counters. */
718  contigbytes = alq->aq_freebytes =
719  alq->aq_writetail;
720  alq->aq_writehead = 0;
721  }
722  }
723  }
724 
725  /*
726  * Return a NULL ALE if:
727  * - The message is larger than our underlying buffer.
728  * - The ALQ is being shutdown.
729  * - There is insufficient free space in our underlying buffer
730  * to accept the message and the user can't wait for space.
731  * - There is insufficient free space in our underlying buffer
732  * to accept the message and the alq is inactive due to prior
733  * use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
734  */
735  if (len > alq->aq_buflen ||
736  alq->aq_flags & AQ_SHUTDOWN ||
737  (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
738  HAS_PENDING_DATA(alq))) && contigbytes < len)) {
739  ALQ_UNLOCK(alq);
740  return (NULL);
741  }
742 
743  /*
744  * If we want ordered writes and there is already at least one thread
745  * waiting for resources to become available, sleep until we're woken.
746  */
747  if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
748  KASSERT(!(flags & ALQ_NOWAIT),
749  ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
750  alq->aq_waiters++;
751  msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
752  alq->aq_waiters--;
753  }
754 
755  /*
756  * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
757  * while loop and sleep until we have enough contiguous free bytes
758  * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
759  * time will be in this loop. Otherwise, multiple threads may be
760  * sleeping here competing for ALQ resources.
761  */
762  while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
763  KASSERT(!(flags & ALQ_NOWAIT),
764  ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
765  alq->aq_flags |= AQ_WANTED;
766  alq->aq_waiters++;
767  if (waitchan)
768  wakeup(waitchan);
769  msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
770  alq->aq_waiters--;
771 
772  if (alq->aq_writehead <= alq->aq_writetail)
773  contigbytes = alq->aq_freebytes;
774  else
775  contigbytes = alq->aq_buflen - alq->aq_writehead;
776 
777  /*
778  * If we're the first thread to wake after an AQ_WANTED wakeup
779  * but there isn't enough free space for us, we're going to loop
780  * and sleep again. If there are other threads waiting in this
781  * loop, schedule a wakeup so that they can see if the space
782  * they require is available.
783  */
784  if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
785  contigbytes < len && !(alq->aq_flags & AQ_WANTED))
786  waitchan = alq;
787  else
788  waitchan = NULL;
789  }
790 
791  /*
792  * If there are waiters, we need to signal the waiting threads after we
793  * complete our work. The alq ptr is used as a wait channel for threads
794  * requiring resources to be freed up. In the AQ_ORDERED case, threads
795  * are not allowed to concurrently compete for resources in the above
796  * while loop, so we use a different wait channel in this case.
797  */
798  if (alq->aq_waiters > 0) {
799  if (alq->aq_flags & AQ_ORDERED)
800  waitchan = &alq->aq_waiters;
801  else
802  waitchan = alq;
803  } else
804  waitchan = NULL;
805 
806  /* Bail if we're shutting down. */
807  if (alq->aq_flags & AQ_SHUTDOWN) {
808  ALQ_UNLOCK(alq);
809  if (waitchan != NULL)
810  wakeup_one(waitchan);
811  return (NULL);
812  }
813 
814  /*
815  * If we are here, we have a contiguous number of bytes >= len
816  * available in our buffer starting at aq_writehead.
817  */
818  alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
819  alq->aq_getpost.ae_bytesused = len;
820 
821  return (&alq->aq_getpost);
822 }
823 
824 struct ale *
825 alq_get(struct alq *alq, int flags)
826 {
827  /* Should only be called in fixed length message (legacy) mode. */
828  KASSERT((alq->aq_flags & AQ_LEGACY),
829  ("%s: fixed length get on variable length queue", __func__));
830  return (alq_getn(alq, alq->aq_entlen, flags));
831 }
832 
833 void
834 alq_post_flags(struct alq *alq, struct ale *ale, int flags)
835 {
836  int activate;
837  void *waitchan;
838 
839  activate = 0;
840 
841  if (ale->ae_bytesused > 0) {
842  if (!(alq->aq_flags & AQ_ACTIVE) &&
843  !(flags & ALQ_NOACTIVATE)) {
844  alq->aq_flags |= AQ_ACTIVE;
845  activate = 1;
846  }
847 
848  alq->aq_writehead += ale->ae_bytesused;
849  alq->aq_freebytes -= ale->ae_bytesused;
850 
851  /* Wrap aq_writehead if we filled to the end of the buffer. */
852  if (alq->aq_writehead == alq->aq_buflen)
853  alq->aq_writehead = 0;
854 
855  KASSERT((alq->aq_writehead >= 0 &&
856  alq->aq_writehead < alq->aq_buflen),
857  ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
858  __func__));
859 
860  KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
861  }
862 
863  /*
864  * If there are waiters, we need to signal the waiting threads after we
865  * complete our work. The alq ptr is used as a wait channel for threads
866  * requiring resources to be freed up. In the AQ_ORDERED case, threads
867  * are not allowed to concurrently compete for resources in the
868  * alq_getn() while loop, so we use a different wait channel in this case.
869  */
870  if (alq->aq_waiters > 0) {
871  if (alq->aq_flags & AQ_ORDERED)
872  waitchan = &alq->aq_waiters;
873  else
874  waitchan = alq;
875  } else
876  waitchan = NULL;
877 
878  ALQ_UNLOCK(alq);
879 
880  if (activate) {
881  ALD_LOCK();
882  ald_activate(alq);
883  ALD_UNLOCK();
884  }
885 
886  /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
887  if (waitchan != NULL)
888  wakeup_one(waitchan);
889 }
890 
891 void
892 alq_flush(struct alq *alq)
893 {
894  int needwakeup = 0;
895 
896  ALD_LOCK();
897  ALQ_LOCK(alq);
898 
899  /*
900  * Pull the lever iff there is data to flush and we're
901  * not already in the middle of a flush operation.
902  */
903  if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
904  if (alq->aq_flags & AQ_ACTIVE)
905  ald_deactivate(alq);
906 
907  ALD_UNLOCK();
908  needwakeup = alq_doio(alq);
909  } else
910  ALD_UNLOCK();
911 
912  ALQ_UNLOCK(alq);
913 
914  if (needwakeup)
915  wakeup_one(alq);
916 }
917 
918 /*
919  * Flush remaining data, close the file and free all resources.
920  */
921 void
922 alq_close(struct alq *alq)
923 {
924  /* Only flush and destroy alq if not already shutting down. */
925  if (ald_rem(alq) == 0)
926  alq_destroy(alq);
927 }
928 
929 static int
930 alq_load_handler(module_t mod, int what, void *arg)
931 {
932  int ret;
933 
934  ret = 0;
935 
936  switch (what) {
937  case MOD_LOAD:
938  case MOD_SHUTDOWN:
939  break;
940 
941  case MOD_QUIESCE:
942  ALD_LOCK();
943  /* Only allow unload if there are no open queues. */
944  if (LIST_FIRST(&ald_queues) == NULL) {
945  ald_shutingdown = 1;
946  ALD_UNLOCK();
947  EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
948  alq_eventhandler_tag);
949  ald_shutdown(NULL, 0);
951  } else {
952  ALD_UNLOCK();
953  ret = EBUSY;
954  }
955  break;
956 
957  case MOD_UNLOAD:
958  /* If MOD_QUIESCE failed we must fail here too. */
959  if (ald_shutingdown == 0)
960  ret = EBUSY;
961  break;
962 
963  default:
964  ret = EINVAL;
965  break;
966  }
967 
968  return (ret);
969 }
970 
971 static moduledata_t alq_mod =
972 {
973  "alq",
975  NULL
976 };
977 
978 DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
979 MODULE_VERSION(alq, 1);
struct ale aq_getpost
Definition: kern_alq.c:71
static struct kproc_desc ald_kp
Definition: kern_alq.c:417
int aq_flags
Definition: kern_alq.c:66
void NDFREE(struct nameidata *ndp, const u_int flags)
Definition: vfs_lookup.c:1091
static void ald_deactivate(struct alq *alq)
Definition: kern_alq.c:176
static LIST_HEAD(alq)
Definition: kern_alq.c:97
char * aq_entbuf
Definition: kern_alq.c:58
void * malloc(unsigned long size, struct malloc_type *mtp, int flags)
Definition: kern_malloc.c:454
int aq_wrapearly
Definition: kern_alq.c:65
static void ald_shutdown(void *arg, int howto)
Definition: kern_alq.c:228
#define AQ_LEGACY
Definition: kern_alq.c:84
void vn_finished_write(struct mount *mp)
Definition: vfs_vnops.c:1599
#define HAS_PENDING_DATA(alq)
Definition: kern_alq.c:89
int alq_write(struct alq *alq, void *data, int flags)
Definition: kern_alq.c:669
int alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, int size, int count)
Definition: kern_alq.c:489
void alq_destroy(struct alq *alq)
Definition: kern_alq.c:294
void alq_close(struct alq *alq)
Definition: kern_alq.c:922
__FBSDID("$BSDSUniX$")
void wakeup_one(void *ident)
Definition: kern_synch.c:398
int aq_writehead
Definition: kern_alq.c:63
#define AQ_FLUSHING
Definition: kern_alq.c:81
struct ale * alq_getn(struct alq *alq, int len, int flags)
Definition: kern_alq.c:681
#define ALD_UNLOCK()
int aq_writetail
Definition: kern_alq.c:64
int vn_open_cred(struct nameidata *ndp, int *flagp, int cmode, u_int vn_open_flags, struct ucred *cred, struct file *fp)
Definition: vfs_vnops.c:124
#define AQ_ACTIVE
Definition: kern_alq.c:80
void kproc_exit(int ecode)
Definition: kern_kthread.c:141
int aq_entlen
Definition: kern_alq.c:60
#define AQ_SHUTDOWN
Definition: kern_alq.c:82
int alq_writen(struct alq *alq, void *data, int len, int flags)
Definition: kern_alq.c:515
SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start,&ald_kp)
#define ALQ_LOCK(alq)
Definition: kern_alq.c:86
void crfree(struct ucred *cr)
Definition: kern_prot.c:1835
struct ale * alq_get(struct alq *alq, int flags)
Definition: kern_alq.c:825
int alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, int size, int flags)
Definition: kern_alq.c:434
DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY)
MODULE_VERSION(alq, 1)
static void alq_shutdown(struct alq *alq)
Definition: kern_alq.c:260
static MALLOC_DEFINE(M_ALD,"ALD","ALD")
static int alq_load_handler(module_t mod, int what, void *arg)
Definition: kern_alq.c:930
void alq_flush(struct alq *alq)
Definition: kern_alq.c:892
int msleep_spin(void *ident, struct mtx *mtx, const char *wmesg, int timo)
Definition: kern_synch.c:265
struct mtx aq_mtx
Definition: kern_alq.c:72
struct ucred * crhold(struct ucred *cr)
Definition: kern_prot.c:1824
#define ALD_LOCK()
static int ald_rem(struct alq *alq)
Definition: kern_alq.c:148
int aq_freebytes
Definition: kern_alq.c:61
void free(void *addr, struct malloc_type *mtp)
Definition: kern_malloc.c:554
int vn_close(struct vnode *vp, int flags, struct ucred *file_cred, struct thread *td)
Definition: vfs_vnops.c:303
static void ald_activate(struct alq *alq)
Definition: kern_alq.c:169
static void ald_daemon(void)
Definition: kern_alq.c:191
void mtx_init(struct mtx *m, const char *name, const char *type, int opts)
Definition: kern_mutex.c:837
void wakeup(void *ident)
Definition: kern_synch.c:378
int aq_waiters
Definition: kern_alq.c:67
int vn_start_write(struct vnode *vp, struct mount **mpp, int flags)
Definition: vfs_vnops.c:1491
#define AQ_WANTED
Definition: kern_alq.c:79
void alq_post_flags(struct alq *alq, struct ale *ale, int flags)
Definition: kern_alq.c:834
void kproc_start(void *udata) const
Definition: kern_kthread.c:57
struct ucred * aq_cred
Definition: kern_alq.c:74
static int alq_doio(struct alq *alq)
Definition: kern_alq.c:308
static moduledata_t alq_mod
Definition: kern_alq.c:971
struct vnode * aq_vp
Definition: kern_alq.c:73
Definition: kern_alq.c:57
void mtx_destroy(struct mtx *m)
Definition: kern_mutex.c:884
static struct mtx ald_mtx
Definition: kern_alq.c:96
#define ALQ_UNLOCK(alq)
Definition: kern_alq.c:87
int aq_buflen
Definition: kern_alq.c:62
#define AQ_ORDERED
Definition: kern_alq.c:83
static void ald_startup(void *unused)
Definition: kern_alq.c:183
int aq_entmax
Definition: kern_alq.c:59
int * count
Definition: cpufreq_if.m:63