Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions daemon/com.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ typedef struct ha_common_object
MTC_U32 in_use;
MTC_U32 ref_count;
MTC_U32 checksum; // to detect modification by reader
pthread_mutex_t thread_id_record_table_mutex;
THREAD_ID_RECORD thread_id_record_table[THREAD_ID_RECORD_NUM];
} HA_COMMON_OBJECT;

Expand Down Expand Up @@ -537,6 +538,7 @@ set_thread_id_record(
clock_gettime(CLOCK_MONOTONIC, &ts);
now = tstoms(ts);

pthread_mutex_lock(&object->thread_id_record_table_mutex);
switch (lock_state) {
case LOCK_STATE_READER_ACQUIREING:
case LOCK_STATE_WRITER_ACQUIREING:
Expand All @@ -551,6 +553,7 @@ set_thread_id_record(
object->thread_id_record_table[i].lock_state = lock_state;
object->thread_id_record_table[i].thread_id = self;
object->thread_id_record_table[i].changed_time = now;
pthread_mutex_unlock(&object->thread_id_record_table_mutex);
return;
}
}
Expand All @@ -571,12 +574,14 @@ set_thread_id_record(
//
object->thread_id_record_table[i].lock_state = lock_state;
object->thread_id_record_table[i].changed_time = now;
pthread_mutex_unlock(&object->thread_id_record_table_mutex);
return;
}
}
log_message(MTC_LOG_WARNING, "COM: thread_id %lu not found in thread_id_record_table.\n", self);
break;
}
pthread_mutex_unlock(&object->thread_id_record_table_mutex);
assert(FALSE);
return ;
}
Expand Down Expand Up @@ -755,6 +760,13 @@ com_create(
handle = (HA_COMMON_OBJECT_HANDLE_INTERNAL *) *object_handle;
object->ref_count ++;
handle->object->in_use++;
pthread_ret = pthread_mutex_init(&handle->object->thread_id_record_table_mutex, NULL);
if (pthread_ret != 0)
{
log_internal(MTC_LOG_ERR, "COM: (%s) pthread_mutex_init failed (sys %d).\n", __func__, pthread_ret);
ret = MTC_ERROR_COM_PTHREAD;
goto error_return;
}
set_thread_id_record(handle->object, LOCK_STATE_WRITER_ACQUIREING);
LEAVE_CS;

Expand All @@ -770,6 +782,7 @@ com_create(

pthread_ret = pthread_rwlock_wrlock(&handle->object->rwlock);
if (fist_on("com.pthread")) pthread_ret = FIST_PTHREAD_ERRCODE;


ENTER_CS;
set_thread_id_record(handle->object, LOCK_STATE_WRITER_ACQUIRED);
Expand Down Expand Up @@ -938,6 +951,11 @@ com_close(
{
log_message(MTC_LOG_WARNING, "COM: pthread_rwlock_destroy failed (sys %d).\n", pthread_ret);
}
pthread_ret = pthread_mutex_destroy(&object->thread_id_record_table_mutex);
if (pthread_ret != 0)
{
log_message(MTC_LOG_WARNING, "COM: pthread_destroy failed (sys %d).\n", pthread_ret);
}
free_object(object);
}
error_return:
Expand Down Expand Up @@ -1098,20 +1116,19 @@ com_writer_lock(
HA_COMMON_OBJECT_HANDLE_INTERNAL *handle = object_handle;
int pthread_ret;

ENTER_CS;
if (!valid_object_handle(handle))
{
log_internal(MTC_LOG_ERR, "COM: (%s) invalid handle.\n", __func__);
assert(FALSE);
ret = MTC_ERROR_COM_INVALID_HANDLE;
goto error_return;
}
ENTER_CS;
handle->object->in_use++;
set_thread_id_record(handle->object, LOCK_STATE_WRITER_ACQUIREING);
LEAVE_CS;
set_thread_id_record(handle->object, LOCK_STATE_WRITER_ACQUIREING);
pthread_ret = pthread_rwlock_wrlock(&handle->object->rwlock);
if (fist_on("com.pthread")) pthread_ret = FIST_PTHREAD_ERRCODE;
ENTER_CS;
set_thread_id_record(handle->object, LOCK_STATE_WRITER_ACQUIRED);
if (pthread_ret != 0)
{
Expand All @@ -1123,7 +1140,6 @@ com_writer_lock(
*buffer = handle->object->buffer;

error_return:
LEAVE_CS;
if (ret != MTC_SUCCESS)
{
log_status(ret, NULL);
Expand Down Expand Up @@ -1158,7 +1174,6 @@ com_writer_unlock(
int pthread_ret;
HA_COMMON_OBJECT_CALLBACK_LIST_ITEM *c;

ENTER_CS;
if (!valid_object_handle(handle))
{
log_internal(MTC_LOG_ERR, "COM: (%s) invalid handle.\n", __func__);
Expand All @@ -1167,6 +1182,7 @@ com_writer_unlock(
goto error_return;
}

ENTER_CS;
for (c = handle->object->callback_list_head; c != NULL; c = c->next)
{
c->func(c->object_handle, handle->object->buffer);
Expand All @@ -1177,6 +1193,8 @@ com_writer_unlock(


handle->object->in_use--;
LEAVE_CS;

set_thread_id_record(handle->object, LOCK_STATE_NONE);
pthread_ret = pthread_rwlock_unlock(&handle->object->rwlock);
if (fist_on("com.pthread")) pthread_ret = FIST_PTHREAD_ERRCODE;
Expand All @@ -1188,7 +1206,6 @@ com_writer_unlock(
}

error_return:
LEAVE_CS;
if (ret != MTC_SUCCESS)
{
log_status(ret, NULL);
Expand Down Expand Up @@ -1225,20 +1242,19 @@ com_reader_lock(
HA_COMMON_OBJECT_HANDLE_INTERNAL *handle = object_handle;
int pthread_ret;

ENTER_CS;
if (!valid_object_handle(handle))
{
log_internal(MTC_LOG_ERR, "COM: (%s) invalid handle.\n", __func__);
assert(FALSE);
ret = MTC_ERROR_COM_INVALID_HANDLE;
goto error_return;
}
ENTER_CS;
handle->object->in_use++;
set_thread_id_record(handle->object, LOCK_STATE_READER_ACQUIREING);
LEAVE_CS;
set_thread_id_record(handle->object, LOCK_STATE_READER_ACQUIREING);
pthread_ret = pthread_rwlock_rdlock(&handle->object->rwlock);
if (fist_on("com.pthread")) pthread_ret = FIST_PTHREAD_ERRCODE;
ENTER_CS;
set_thread_id_record(handle->object, LOCK_STATE_READER_ACQUIRED);
if (pthread_ret != 0)
{
Expand All @@ -1249,7 +1265,6 @@ com_reader_lock(
*buffer = handle->object->buffer;

error_return:
LEAVE_CS;
if (ret != MTC_SUCCESS)
{
log_status(ret, NULL);
Expand Down Expand Up @@ -1282,7 +1297,6 @@ com_reader_unlock(
HA_COMMON_OBJECT_HANDLE_INTERNAL *handle = object_handle;
int pthread_ret;

ENTER_CS;
if (!valid_object_handle(handle))
{
log_internal(MTC_LOG_ERR, "COM: (%s) invalid handle.\n", __func__);
Expand All @@ -1291,10 +1305,12 @@ com_reader_unlock(
goto error_return;
}

ENTER_CS;
#ifndef NDEBUG
assert(handle->object->checksum == calc_checksum_object_buffer(handle->object));
#endif //NDEBUG
handle->object->in_use--;
LEAVE_CS;
set_thread_id_record(handle->object, LOCK_STATE_NONE);
pthread_ret = pthread_rwlock_unlock(&handle->object->rwlock);
if (fist_on("com.pthread")) pthread_ret = FIST_PTHREAD_ERRCODE;
Expand All @@ -1306,7 +1322,6 @@ com_reader_unlock(
}

error_return:
LEAVE_CS;
if (ret != MTC_SUCCESS)
{
log_status(ret, NULL);
Expand Down
24 changes: 18 additions & 6 deletions daemon/sm.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#define _GNU_SOURCE
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <signal.h>
Expand Down Expand Up @@ -2819,6 +2820,8 @@ check_sigs(
return signaled;
}

#define NSEC_PER_SEC 1000000000

MTC_STATIC MTC_BOOLEAN
sm_wait_signals_sm_hb_sf(
MTC_BOOLEAN sm_sig,
Expand All @@ -2827,26 +2830,35 @@ sm_wait_signals_sm_hb_sf(
MTC_CLOCK timeout)
{
MTC_BOOLEAN signaled;
MTC_CLOCK start = _getms();

struct timespec deadline;

if (timeout == 0)
{
return FALSE;
}

struct timespec timeout_ts = mstots(timeout);
if (clock_gettime(CLOCK_REALTIME, &deadline) < 0) {
log_message(MTC_LOG_WARNING, "clock_gettime failed (sys %d)", errno);
timeout = -1;
}
deadline.tv_nsec += timeout_ts.tv_nsec;
deadline.tv_sec += timeout_ts.tv_sec;
deadline.tv_sec += deadline.tv_nsec / NSEC_PER_SEC;
deadline.tv_nsec %= NSEC_PER_SEC;

pthread_mutex_lock(&smvar.mutex);
while (!(signaled = check_sigs(sm_sig, hb_sig, sf_sig)) &&
((timeout < 0)? TRUE: (_getms() - start < timeout)))
while (!(signaled = check_sigs(sm_sig, hb_sig, sf_sig)))
{
if (timeout < 0)
{
pthread_cond_wait(&smvar.cond, &smvar.mutex);
}
else
{
pthread_mutex_unlock(&smvar.mutex);
mssleep(100);
pthread_mutex_lock(&smvar.mutex);
if (pthread_cond_timedwait(&smvar.cond, &smvar.mutex, &deadline) == ETIMEDOUT)
break;
}
}
pthread_mutex_unlock(&smvar.mutex);
Expand Down
Loading