libfuse
fuse_loop_mt.c
1 /*
2  FUSE: Filesystem in Userspace
3  Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
4 
5  Implementation of the multi-threaded FUSE session loop.
6 
7  This program can be distributed under the terms of the GNU LGPLv2.
8  See the file COPYING.LIB.
9 */
10 
11 #include "config.h"
12 #include "fuse_lowlevel.h"
13 #include "fuse_misc.h"
14 #include "fuse_kernel.h"
15 #include "fuse_i.h"
16 
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <signal.h>
22 #include <semaphore.h>
23 #include <errno.h>
24 #include <sys/time.h>
25 #include <sys/ioctl.h>
26 #include <assert.h>
27 
28 /* Environment var controlling the thread stack size */
29 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
30 
31 struct fuse_worker {
32  struct fuse_worker *prev;
33  struct fuse_worker *next;
34  pthread_t thread_id;
35  size_t bufsize;
36  struct fuse_buf fbuf;
37  struct fuse_chan *ch;
38  struct fuse_mt *mt;
39 };
40 
41 struct fuse_mt {
42  pthread_mutex_t lock;
43  int numworker;
44  int numavail;
45  struct fuse_session *se;
46  struct fuse_worker main;
47  sem_t finish;
48  int exit;
49  int error;
50  int clone_fd;
51  int max_idle;
52 };
53 
54 static struct fuse_chan *fuse_chan_new(int fd)
55 {
56  struct fuse_chan *ch = (struct fuse_chan *) malloc(sizeof(*ch));
57  if (ch == NULL) {
58  fprintf(stderr, "fuse: failed to allocate channel\n");
59  return NULL;
60  }
61 
62  memset(ch, 0, sizeof(*ch));
63  ch->fd = fd;
64  ch->ctr = 1;
65  fuse_mutex_init(&ch->lock);
66 
67  return ch;
68 }
69 
70 struct fuse_chan *fuse_chan_get(struct fuse_chan *ch)
71 {
72  assert(ch->ctr > 0);
73  pthread_mutex_lock(&ch->lock);
74  ch->ctr++;
75  pthread_mutex_unlock(&ch->lock);
76 
77  return ch;
78 }
79 
80 void fuse_chan_put(struct fuse_chan *ch)
81 {
82  if (ch == NULL)
83  return;
84  pthread_mutex_lock(&ch->lock);
85  ch->ctr--;
86  if (!ch->ctr) {
87  pthread_mutex_unlock(&ch->lock);
88  close(ch->fd);
89  pthread_mutex_destroy(&ch->lock);
90  free(ch);
91  } else
92  pthread_mutex_unlock(&ch->lock);
93 }
94 
95 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
96 {
97  struct fuse_worker *prev = next->prev;
98  w->next = next;
99  w->prev = prev;
100  prev->next = w;
101  next->prev = w;
102 }
103 
104 static void list_del_worker(struct fuse_worker *w)
105 {
106  struct fuse_worker *prev = w->prev;
107  struct fuse_worker *next = w->next;
108  prev->next = next;
109  next->prev = prev;
110 }
111 
112 static int fuse_loop_start_thread(struct fuse_mt *mt);
113 
114 static void *fuse_do_work(void *data)
115 {
116  struct fuse_worker *w = (struct fuse_worker *) data;
117  struct fuse_mt *mt = w->mt;
118 
119  while (!fuse_session_exited(mt->se)) {
120  int isforget = 0;
121  int res;
122 
123  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
124  res = fuse_session_receive_buf_int(mt->se, &w->fbuf, w->ch);
125  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
126  if (res == -EINTR)
127  continue;
128  if (res <= 0) {
129  if (res < 0) {
130  fuse_session_exit(mt->se);
131  mt->error = res;
132  }
133  break;
134  }
135 
136  pthread_mutex_lock(&mt->lock);
137  if (mt->exit) {
138  pthread_mutex_unlock(&mt->lock);
139  return NULL;
140  }
141 
142  /*
143  * This disgusting hack is needed so that zillions of threads
144  * are not created on a burst of FORGET messages
145  */
146  if (!(w->fbuf.flags & FUSE_BUF_IS_FD)) {
147  struct fuse_in_header *in = w->fbuf.mem;
148 
149  if (in->opcode == FUSE_FORGET ||
150  in->opcode == FUSE_BATCH_FORGET)
151  isforget = 1;
152  }
153 
154  if (!isforget)
155  mt->numavail--;
156  if (mt->numavail == 0)
157  fuse_loop_start_thread(mt);
158  pthread_mutex_unlock(&mt->lock);
159 
160  fuse_session_process_buf_int(mt->se, &w->fbuf, w->ch);
161 
162  pthread_mutex_lock(&mt->lock);
163  if (!isforget)
164  mt->numavail++;
165  if (mt->numavail > mt->max_idle) {
166  if (mt->exit) {
167  pthread_mutex_unlock(&mt->lock);
168  return NULL;
169  }
170  list_del_worker(w);
171  mt->numavail--;
172  mt->numworker--;
173  pthread_mutex_unlock(&mt->lock);
174 
175  pthread_detach(w->thread_id);
176  free(w->fbuf.mem);
177  fuse_chan_put(w->ch);
178  free(w);
179  return NULL;
180  }
181  pthread_mutex_unlock(&mt->lock);
182  }
183 
184  sem_post(&mt->finish);
185 
186  return NULL;
187 }
188 
189 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
190 {
191  sigset_t oldset;
192  sigset_t newset;
193  int res;
194  pthread_attr_t attr;
195  char *stack_size;
196 
197  /* Override default stack size */
198  pthread_attr_init(&attr);
199  stack_size = getenv(ENVNAME_THREAD_STACK);
200  if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
201  fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size);
202 
203  /* Disallow signal reception in worker threads */
204  sigemptyset(&newset);
205  sigaddset(&newset, SIGTERM);
206  sigaddset(&newset, SIGINT);
207  sigaddset(&newset, SIGHUP);
208  sigaddset(&newset, SIGQUIT);
209  pthread_sigmask(SIG_BLOCK, &newset, &oldset);
210  res = pthread_create(thread_id, &attr, func, arg);
211  pthread_sigmask(SIG_SETMASK, &oldset, NULL);
212  pthread_attr_destroy(&attr);
213  if (res != 0) {
214  fprintf(stderr, "fuse: error creating thread: %s\n",
215  strerror(res));
216  return -1;
217  }
218 
219  return 0;
220 }
221 
222 static struct fuse_chan *fuse_clone_chan(struct fuse_mt *mt)
223 {
224  int res;
225  int clonefd;
226  uint32_t masterfd;
227  struct fuse_chan *newch;
228  const char *devname = "/dev/fuse";
229 
230 #ifndef O_CLOEXEC
231 #define O_CLOEXEC 0
232 #endif
233  clonefd = open(devname, O_RDWR | O_CLOEXEC);
234  if (clonefd == -1) {
235  fprintf(stderr, "fuse: failed to open %s: %s\n", devname,
236  strerror(errno));
237  return NULL;
238  }
239  fcntl(clonefd, F_SETFD, FD_CLOEXEC);
240 
241  masterfd = mt->se->fd;
242  res = ioctl(clonefd, FUSE_DEV_IOC_CLONE, &masterfd);
243  if (res == -1) {
244  fprintf(stderr, "fuse: failed to clone device fd: %s\n",
245  strerror(errno));
246  close(clonefd);
247  return NULL;
248  }
249  newch = fuse_chan_new(clonefd);
250  if (newch == NULL)
251  close(clonefd);
252 
253  return newch;
254 }
255 
256 static int fuse_loop_start_thread(struct fuse_mt *mt)
257 {
258  int res;
259 
260  struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
261  if (!w) {
262  fprintf(stderr, "fuse: failed to allocate worker structure\n");
263  return -1;
264  }
265  memset(w, 0, sizeof(struct fuse_worker));
266  w->fbuf.mem = NULL;
267  w->mt = mt;
268 
269  w->ch = NULL;
270  if (mt->clone_fd) {
271  w->ch = fuse_clone_chan(mt);
272  if(!w->ch) {
273  /* Don't attempt this again */
274  fprintf(stderr, "fuse: trying to continue "
275  "without -o clone_fd.\n");
276  mt->clone_fd = 0;
277  }
278  }
279 
280  res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
281  if (res == -1) {
282  fuse_chan_put(w->ch);
283  free(w);
284  return -1;
285  }
286  list_add_worker(w, &mt->main);
287  mt->numavail ++;
288  mt->numworker ++;
289 
290  return 0;
291 }
292 
293 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
294 {
295  pthread_join(w->thread_id, NULL);
296  pthread_mutex_lock(&mt->lock);
297  list_del_worker(w);
298  pthread_mutex_unlock(&mt->lock);
299  free(w->fbuf.mem);
300  fuse_chan_put(w->ch);
301  free(w);
302 }
303 
304 FUSE_SYMVER(".symver fuse_session_loop_mt_32,fuse_session_loop_mt@@FUSE_3.2");
305 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config *config)
306 {
307  int err;
308  struct fuse_mt mt;
309  struct fuse_worker *w;
310 
311  memset(&mt, 0, sizeof(struct fuse_mt));
312  mt.se = se;
313  mt.clone_fd = config->clone_fd;
314  mt.error = 0;
315  mt.numworker = 0;
316  mt.numavail = 0;
317  mt.max_idle = config->max_idle_threads;
318  mt.main.thread_id = pthread_self();
319  mt.main.prev = mt.main.next = &mt.main;
320  sem_init(&mt.finish, 0, 0);
321  fuse_mutex_init(&mt.lock);
322 
323  pthread_mutex_lock(&mt.lock);
324  err = fuse_loop_start_thread(&mt);
325  pthread_mutex_unlock(&mt.lock);
326  if (!err) {
327  /* sem_wait() is interruptible */
328  while (!fuse_session_exited(se))
329  sem_wait(&mt.finish);
330 
331  pthread_mutex_lock(&mt.lock);
332  for (w = mt.main.next; w != &mt.main; w = w->next)
333  pthread_cancel(w->thread_id);
334  mt.exit = 1;
335  pthread_mutex_unlock(&mt.lock);
336 
337  while (mt.main.next != &mt.main)
338  fuse_join_worker(&mt, mt.main.next);
339 
340  err = mt.error;
341  }
342 
343  pthread_mutex_destroy(&mt.lock);
344  sem_destroy(&mt.finish);
345  if(se->error != 0)
346  err = se->error;
347  fuse_session_reset(se);
348  return err;
349 }
350 
351 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd);
352 FUSE_SYMVER(".symver fuse_session_loop_mt_31,fuse_session_loop_mt@FUSE_3.0");
353 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
354 {
355  struct fuse_loop_config config;
356  config.clone_fd = clone_fd;
357  config.max_idle_threads = 10;
358  return fuse_session_loop_mt_32(se, &config);
359 }
void fuse_session_reset(struct fuse_session *se)
unsigned int max_idle_threads
Definition: fuse_common.h:103
int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
Definition: fuse_loop_mt.c:353
int fuse_session_exited(struct fuse_session *se)
void fuse_session_exit(struct fuse_session *se)