From patchwork Fri Jul 29 07:00:33 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Yusheng Wang X-Patchwork-Id: 654010 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from archives.nicira.com (archives.nicira.com [96.126.127.54]) by ozlabs.org (Postfix) with ESMTP id 3s104S71kmz9t0j for ; Fri, 29 Jul 2016 17:00:55 +1000 (AEST) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=onevmw.onmicrosoft.com header.i=@onevmw.onmicrosoft.com header.b=JaycH0KO; dkim-atps=neutral Received: from archives.nicira.com (localhost [127.0.0.1]) by archives.nicira.com (Postfix) with ESMTP id B4D2E11449; Fri, 29 Jul 2016 00:00:53 -0700 (PDT) X-Original-To: dev@openvswitch.org Delivered-To: dev@openvswitch.org Received: from mx3v3.cudamail.com (mx3.cudamail.com [64.34.241.5]) by archives.nicira.com (Postfix) with ESMTPS id 74BF7113F8 for ; Fri, 29 Jul 2016 00:00:52 -0700 (PDT) Received: from bar6.cudamail.com (localhost [127.0.0.1]) by mx3v3.cudamail.com (Postfix) with ESMTPS id DD8D61623F4 for ; Fri, 29 Jul 2016 01:00:51 -0600 (MDT) X-ASG-Debug-ID: 1469775647-0b323747743893b0001-byXFYA Received: from mx3-pf2.cudamail.com ([192.168.14.1]) by bar6.cudamail.com with ESMTP id 5Bd9HBRnREDaABGd (version=TLSv1 cipher=DHE-RSA-AES256-SHA bits=256 verify=NO) for ; Fri, 29 Jul 2016 01:00:47 -0600 (MDT) X-Barracuda-Envelope-From: yshwang@vmware.com X-Barracuda-RBL-Trusted-Forwarder: 192.168.14.1 Received: from unknown (HELO smtp-outbound-2.vmware.com) (208.91.2.13) by mx3-pf2.cudamail.com with ESMTPS (DHE-RSA-AES256-SHA encrypted); 29 Jul 2016 07:00:46 -0000 Received-SPF: error (mx3-pf2.cudamail.com: error in processing during lookup of vmware.com: DNS problem) X-Barracuda-Apparent-Source-IP: 208.91.2.13 X-Barracuda-RBL-IP: 208.91.2.13 Received: from sc9-mailhost2.vmware.com (sc9-mailhost2.vmware.com [10.113.161.72]) by smtp-outbound-2.vmware.com (Postfix) with ESMTP id 5C832983C2 for ; Fri, 29 Jul 2016 00:00:45 -0700 (PDT) Received: from EX13-CAS-008.vmware.com (smtp-inbound.vmware.com [10.113.191.58]) by sc9-mailhost2.vmware.com (Postfix) with ESMTP id 955F0B0492 for ; Fri, 29 Jul 2016 00:00:45 -0700 (PDT) Received: from EX13-CAS-002.vmware.com (10.113.191.52) by EX13-MBX-004.vmware.com (10.113.191.24) with Microsoft SMTP Server (TLS) id 15.0.1156.6; Fri, 29 Jul 2016 00:00:44 -0700 Received: from NAM03-DM3-obe.outbound.protection.outlook.com (10.113.170.11) by EX13-CAS-002.vmware.com (10.113.191.52) with Microsoft SMTP Server (TLS) id 15.0.1156.6 via Frontend Transport; Fri, 29 Jul 2016 00:00:44 -0700 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=onevmw.onmicrosoft.com; s=selector1-vmware-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version; bh=cPjlmXrQ0TH+Dzrxq17NSCKiVzcywQzSXfVrdNjAJmw=; b=JaycH0KOee1/my0SU+96wsJnu4L/DqdCS52neCQ2ERB5wT7V0ehy49xsCEh1iG6rFlO+mWoqf/Zm29Z9483B9NoO0aDyuMC8DOnErQ4on/LodvGH1cllxmxpkL+pWMgJWP4HrA2EbbpzIgLNzxeEm1HtRRfWGPhDeakFHq503nM= Received: from SN1PR0501MB1759.namprd05.prod.outlook.com (10.163.130.26) by SN1PR0501MB1757.namprd05.prod.outlook.com (10.163.130.24) with Microsoft SMTP Server (version=TLS1_0, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA_P384) id 15.1.557.8; Fri, 29 Jul 2016 07:00:34 +0000 Received: from SN1PR0501MB1759.namprd05.prod.outlook.com ([10.163.130.26]) by SN1PR0501MB1759.namprd05.prod.outlook.com ([10.163.130.26]) with mapi id 15.01.0557.009; Fri, 29 Jul 2016 07:00:34 +0000 X-CudaMail-Envelope-Sender: yshwang@vmware.com From: Yusheng Wang To: "dev@openvswitch.org" X-CudaMail-Whitelist-To: dev@openvswitch.org X-CudaMail-MID: CM-V2-728000613 X-CudaMail-DTE: 072916 X-CudaMail-Originating-IP: 208.91.2.13 Thread-Topic: [PATCH V2] OVN: Initial patch of datalog engine X-ASG-Orig-Subj: [##CM-V2-728000613##][PATCH V2] OVN: Initial patch of datalog engine Thread-Index: AQHR6WYHPusB3Rsov0aXeZ506RCvGg== Date: Fri, 29 Jul 2016 07:00:33 +0000 Message-ID: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: spf=none (sender IP is ) smtp.mailfrom=yshwang@vmware.com; x-originating-ip: [208.91.1.34] x-ms-office365-filtering-correlation-id: 954e4f29-e3f3-4187-33e0-08d3b77e09a7 x-microsoft-exchange-diagnostics: 1; SN1PR0501MB1757; 20:5uycikKeMqc185R6T1O7sYwerm30INXk6+2s1iKkOBVufLKzE5gkNYol6LUY83eMVoj+TVck5ijv8mPX3QsFjVTH9NwBm9xet/mLt9HOr2y5gNbkN16mIT3GuB1QVo7ac0XuGbp/SjugTpxv1O8IfO2AL6DB+FwEqKVw+iE1M5s= x-microsoft-antispam: UriScan:;BCL:0;PCL:0;RULEID:;SRVR:SN1PR0501MB1757; x-microsoft-antispam-prvs: x-exchange-antispam-report-test: UriScan:(61668805478150)(250069074691196)(131327999870524)(17755550239193); x-exchange-antispam-report-cfa-test: BCL:0; PCL:0; RULEID:(601004)(2401047)(8121501046)(5005006)(10201501046)(3002001); SRVR:SN1PR0501MB1757; BCL:0; PCL:0; RULEID:; SRVR:SN1PR0501MB1757; x-forefront-prvs: 0018A2705B x-forefront-antispam-report: SFV:NSPM; SFS:(10009020)(6009001)(7916002)(189002)(199003)(87936001)(450100001)(105586002)(2906002)(99286002)(1730700003)(10400500002)(81156014)(8676002)(3280700002)(2501003)(76576001)(3660700001)(110136002)(4001430100002)(107886002)(101416001)(81166006)(122556002)(11100500001)(74316002)(19580395003)(68736007)(305945005)(5890100001)(15975445007)(2900100001)(189998001)(2351001)(4326007)(106116001)(9686002)(7696003)(92566002)(106356001)(5002640100001)(86362001)(575784001)(19580405001)(66066001)(586003)(33656002)(97736004)(8936002)(54356999)(50986999)(7736002)(6116002)(7846002)(77096005)(5640700001)(102836003)(229853001)(3846002)(569005); DIR:OUT; SFP:1101; SCL:1; SRVR:SN1PR0501MB1757; H:SN1PR0501MB1759.namprd05.prod.outlook.com; FPR:; SPF:None; PTR:InfoNoRecords; MX:1; A:1; LANG:en; received-spf: None (protection.outlook.com: vmware.com does not designate permitted sender hosts) spamdiagnosticoutput: 1:99 spamdiagnosticmetadata: NSPM MIME-Version: 1.0 X-MS-Exchange-CrossTenant-originalarrivaltime: 29 Jul 2016 07:00:33.9131 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: b39138ca-3cee-4b4a-a4d6-cd83d9dd62f0 X-MS-Exchange-Transport-CrossTenantHeadersStamped: SN1PR0501MB1757 X-OriginatorOrg: vmware.com X-Barracuda-Connect: UNKNOWN[192.168.14.1] X-Barracuda-Start-Time: 1469775647 X-Barracuda-Encrypted: DHE-RSA-AES256-SHA X-Barracuda-URL: https://web.cudamail.com:443/cgi-mod/mark.cgi X-ASG-Whitelist: Header =?UTF-8?B?eFwtY3VkYW1haWxcLXdoaXRlbGlzdFwtdG8=?= X-Barracuda-BRTS-Status: 1 X-Virus-Scanned: by bsmtpd at cudamail.com X-ASG-Whitelist: EmailCat (corporate) Subject: [ovs-dev] [PATCH V2] OVN: Initial patch of datalog engine X-BeenThere: dev@openvswitch.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@openvswitch.org Sender: "dev" From bc291ad6a1ec7f1e05455d36be05c655c61b243d Mon Sep 17 00:00:00 2001 From: Yusheng Wang Date: Fri, 29 Jul 2016 12:26:49 +0800 Subject: [PATCH V2] OVN: Initial patch of datalog engine Main changes: * Man page and code in the same patch. * Use prefix dtlog_ consistently in code. * Enhanced serialize method. Thank Ryan and Flaviof for comments. Signed-off-by: Yusheng Wang --- ovn/lib/automake.mk | 7 + ovn/lib/datalog-private.h | 864 +++++++++++ ovn/lib/datalog.c | 3759 +++++++++++++++++++++++++++++++++++++++++++++ ovn/lib/datalog.h | 57 + ovn/lib/ovn-datalog.7.xml | 493 ++++++ tests/automake.mk | 2 + tests/datalog.at | 11 + tests/test-datalog.c | 1824 ++++++++++++++++++++++ tests/testsuite.at | 1 + 9 files changed, 7018 insertions(+) create mode 100644 ovn/lib/datalog-private.h create mode 100644 ovn/lib/datalog.c create mode 100644 ovn/lib/datalog.h create mode 100644 ovn/lib/ovn-datalog.7.xml create mode 100644 tests/datalog.at create mode 100644 tests/test-datalog.c diff --git a/ovn/lib/automake.mk b/ovn/lib/automake.mk index 4e9daf5..ceb21b3 100644 --- a/ovn/lib/automake.mk +++ b/ovn/lib/automake.mk @@ -10,6 +10,9 @@ ovn_lib_libovn_la_SOURCES = \ ovn/lib/ovn-dhcp.h \ ovn/lib/ovn-util.c \ ovn/lib/ovn-util.h \ + ovn/lib/datalog.h \ + ovn/lib/datalog-private.h \ + ovn/lib/datalog.c \ ovn/lib/logical-fields.h nodist_ovn_lib_libovn_la_SOURCES = \ ovn/lib/ovn-nb-idl.c \ @@ -43,3 +46,7 @@ ovn/lib/ovn-nb-idl.ovsidl: $(OVN_NB_IDL_FILES) $(AM_V_GEN)$(OVSDB_IDLC) annotate $(OVN_NB_IDL_FILES) > $@.tmp && \ mv $@.tmp $@ +man_MANS += ovn/lib/ovn-datalog.7 +EXTRA_DIST += ovn/lib/ovn-datalog.7.xml +DISTCLEANFILES += ovn/lib/ovn-datalog.7 + diff --git a/ovn/lib/datalog-private.h b/ovn/lib/datalog-private.h new file mode 100644 index 0000000..5140c51 --- /dev/null +++ b/ovn/lib/datalog-private.h @@ -0,0 +1,864 @@ +/* Copyright (c) 2016 VMware, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVN_DATALOG_PRIV_H +#define OVN_DATALOG_PRIV_H 1 + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef USE_OUTSIDE_OVS +#include "util.h" +#define dtlog_assert(c) ovs_assert(c) +#else +/* Include assert header file here. */ +#define dtlog_assert(c) assert(c) +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/* -------------------------------------------------------------------------- + * CONSTANTS AND MACROS + * -------------------------------------------------------------------------- + */ + +enum dtlog_type { + DTLOG_T_UNKNOWN, + DTLOG_T_ARRAY, /* Array. */ + DTLOG_T_MAP, /* Map. */ + DTLOG_T_SET, /* Set. */ + DTLOG_T_INDEX, /* Index of tuple. */ + /* The collection type must be */ + /* (UNKNOWN, UNKNOWN, INDEX). */ + + DTLOG_T_INT32, /* 32 bit integer. */ + DTLOG_T_TST_INT32, /* 32 bit integer for testing hash map. */ + DTLOG_T_STR, /* Null terminated string. */ + DTLOG_T_VALUE, /* log_value_t */ + DTLOG_T_INT_TUPLE, /* log_int_tuple_t */ + DTLOG_T_BITSET, /* log_bitset_t */ + DTLOG_T_BUF, /* log_buf_t */ + DTLOG_T_TUPLE, /* log_tuple_t */ + DTLOG_T_TABLE, /* log_table_t */ + DTLOG_T_RULE, /* log_rule_t */ + DTLOG_T_RULE_SET, /* log_rule_set_t */ + DTLOG_T_LOG_ENG, /* log_engine_t */ + DTLOG_T_JOIN_PARAM, /* log_join_param_t */ +}; + +/* A collection is defined by its key type, value type, and collection type, + * which could be array, map, set, or index. Index is a special hash map + * used only with tuples of table. */ + +#define DTLOG_KEY(id) (id) +#define DTLOG_KTYPE(t) ((t) & 0xff) +#define DTLOG_VALUE(id) ((id) << 8) +#define DTLOG_VTYPE(t) (((t) >> 8) & 0xff) +#define DTLOG_COLL(id) ((id) << 16) +#define DTLOG_CTYPE(t) (((t) >> 16) & 0xff) /* Collection type. */ + +/* This identifies integer to value map. */ +#define DTLOG_MAP_I2V (DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_VALUE)) + +/* Conversion between pointer and int32_t. Type cast will produce warning. */ +#define dtlog_p2i(v) ((int32_t)(((union { intptr_t i; void* p; })(v)).i)) +#define dtlog_i2p(v) (((union { intptr_t i; void* p; })(intptr_t)(v)).p) + +/* This helps showing type of a collection variable. */ +#define DTLOG_T(t, def) t /* For array or set. */ +#define DTLOG_T2(t, key, val) t /* For map. */ + +#define DTLOG_SZ_LOG_TOKEN 1024 /* Max length of token or literal. */ +#define DTLOG_SZ_INIT_BITSET 16 /* Initial capacity of bitset. */ +#define DTLOG_SZ_INIT_ARRAY 16 /* Initial capacity of array. */ +#define DTLOG_SZ_INIT_HASH 11 /* Initial capacity of hash map. */ + +#define DTLOG_LOG_COMP 0 /* 0 for not enable logging. */ + +/* -------------------------------------------------------------------------- + * BITSET, ARRAY, HASH MAP, AND HASH SET + * -------------------------------------------------------------------------- + */ + +/* When type is known for a collection, it could be freed using + * _free, and all nested structure will be freed. Items never + * make cross reference by default, except for log_value_t. When there is + * cross reference, collection type could be set to unknown for all but + * one collection variable to prevent multiple free. 'global value' is the + * set for all string literals. 'dtlog_meta_t' must be the first fields + * for collections. 'hash_code' must be the first field for struct of key + * in a hash map or hash set. + * + * Naming convention for collection function: _: + * Collection could be: bitset, array, set, or map. + * Method could be: (init, free), (add, del), (ins, rmv), (get, set). + * (ins, rmv) is position based operation for array. + */ + +struct dtlog_hash_s; + +typedef struct dtlog_meta_s { + int32_t type; /* Must be the first field. */ + bool alloc; /* Indicates if the structure is from malloc. */ + struct dtlog_hash_s* glb_values; +} dtlog_meta_t; + +typedef struct dtlog_bits_s { + dtlog_meta_t m; /* Must be the first field. */ + int32_t size; /* Length of 'items' allocated. */ + uint32_t* items; +} dtlog_bits_t; + +typedef struct dtlog_ary_s { + dtlog_meta_t m; + void** item; + int32_t size; /* Size of the array. */ + int32_t len; /* Length of allocated items. */ +} dtlog_ary_t; + +typedef struct dtlog_map_node_s { + struct dtlog_map_node_s* next; /* Must be the first field. */ + void* key; + void* value; +} dtlog_map_node_t; + +/* set_node_s, index_node_s must have the same structure as map_node_s + * for the first two fields. Assume they have the same alignment, and + * map_node_s is used to access both. + */ +typedef struct dtlog_set_node_s { + struct dtlog_set_node_s* next; + void* key; +} dtlog_set_node_t; + +/* For index map, the aux points to log_int_tuple which defines the + * index. The key of each entry points to one tuple of the tuple set + * whose elements share the same tuple key. The set is represented by + * double link presented in the tuple. + */ +typedef struct dtlog_index_node_s { + struct dtlog_index_node_s* next; + void* key; /* Points to one tuple of the set. */ + int32_t hash_code; /* hash_code of the tuple key. */ +} dtlog_index_node_t; + +typedef struct dtlog_hash_s { + dtlog_meta_t m; + dtlog_map_node_t** bucket; + int32_t size; /* Size of items. */ + int32_t len; /* Length of bucket array. */ + void* aux; /* Extra data. */ +} dtlog_hash_t; + +typedef dtlog_hash_t dtlog_map_t; +typedef dtlog_hash_t dtlog_set_t; + +/* -------------------------------------------------------------------------- + * DATA STRUCTURE OF LOG ENGINE + * -------------------------------------------------------------------------- + */ + +typedef struct dtlog_config_s { + char sep1; /* Field separator. */ + char sep2; /* Record separator. */ + char esc; /* Escape character. */ +} dtlog_config_t; + +extern dtlog_config_t dtlog_config; + +typedef struct dtlog_value_s { /* Variable size structure. */ + int32_t hash_code; /* Must be first field. */ + int32_t ref_no; /* Number of references. */ + /* The actual size is abs(size) and it does not count terminating 0. */ + /* 0 is always padded no matter .a or .p is used for printing value. */ + int32_t size; /* size < 0 indicates using value.p */ + /* size >= 0 indicates using value.a */ + union { + char a[0]; /* Byte array, need not terminate with null. */ + char* p; /* Only used in populating the value set. */ + } value; +} dtlog_value_t; + +/* + * dtlog_tuple_t.indexes is a log_tuple_t pointer array, which represents + * an array of double linked list. For index i, indexes[i * 2] points to + * previous node and index[i * 2 + 1] points to success node. + */ +typedef struct dtlog_tuple_s { /* Variable size structure. */ + int32_t hash_code; /* Must be first field. */ + int32_t n_values; /* Duplicated with table.num_fields */ + int64_t count; /* Valid only in table. When tuple is outside */ + /* of table, it has different meaning. */ + struct dtlog_tuple_s** indexes; /* Valid only in table. */ + dtlog_value_t* values[0]; /* Field array of the tuple. */ +} dtlog_tuple_t; + +typedef struct dtlog_ints_s { /* Variable size structure. */ + int32_t hash_code; /* Must be first field. */ + int32_t n_items; /* Number of integers */ + int32_t values[0]; +} dtlog_ints_t; + +struct dtlog_engine_s; + +/* index_map, index_def and tuple.indexes must have the same size, + * and align to each other, i.e. assume there is N indexes defined, + * tuple.indexes has N * 2 items, and index_map and index_def + * have N items. For index j: + * (1) index_def[key_def] -> j, defines the index, e.g., '0:3:4'->1. + * (2) index_map[j] defines hash map from key tuples to tuple set. + * The aux of this map points to corresponding index_def's key. + * (3) tuple.indexes[j * 2] and tuple.indexes[j * 2 + 1] defines tuple set. + */ +typedef struct dtlog_table_s { + dtlog_meta_t m; + int32_t table_index; /* Id of the table. */ + int32_t num_fields; /* Also presents in each tuple. */ + bool is_remove; /* Valid if this is delta. */ + + DTLOG_T2(dtlog_map_t, dtlog_ints_t*, int32_t) index_def; + DTLOG_T(dtlog_ary_t, dtlog_hash_t*) index_map; + DTLOG_T(dtlog_set_t, dtlog_tuple_t*) tuples; +} dtlog_table_t; + +typedef struct dtlog_buf_s { + dtlog_meta_t m; + int32_t pos; /* Next write position. */ + int32_t aux; /* Extra data associated with the buffer. */ + int32_t size; /* Size of buffer. */ + char* buf; +} dtlog_buf_t; + +typedef struct dtlog_rule_s { + dtlog_meta_t m; + + bool is_union; + + /* Table index starts from 0. Item 0 is for left side. + * Example X1 : X2, X3 -> 7, 3, 6. + * Rule and param correspond to each other, i.e., param[i] defines + * parameter list for rule[i]. + */ + DTLOG_T(dtlog_ary_t, int32_t) rule; + + /* Item 0 is for left side. Parameter index starts from 0. + * Example: X1(a,b) : X2(a, -), X3('c', b) -> ((0, 1), (0, -1), (-2, 1)). + * -1 indicates 'ignore', -2 is the index for the first constant. + */ + DTLOG_T(dtlog_ary_t, dtlog_ary_t*) param; /* Array of array of int. */ + + /* Example: -2 -> 'c', map from integer to value. */ + DTLOG_T2(dtlog_map_t, int32_t, dtlog_value_t*) const_param; + + /* Example: 0->'a', 1->'b'. */ + DTLOG_T2(dtlog_map_t, int32_t, dtlog_value_t*) param_name_map; +} dtlog_rule_t; + +typedef struct dtlog_rule_set_s { + dtlog_meta_t m; + + /* Example: 7-> 'X1'. */ + DTLOG_T2(dtlog_map_t, int32_t, dtlog_value_t*) rule_name_map; + DTLOG_T2(dtlog_map_t, dtlog_value_t*, int32_t) rule_index_map; + + /* Table index -> rule. Table index starts from 0. */ + DTLOG_T2(dtlog_map_t, int32_t, dtlog_rule_t*) rules; + + /* Table index -> table index array. For X1 : X2, X3 -> 7, 3, 6, + * The mapping is 3->(7), 6->(7). List is ordered. + */ + DTLOG_T2(dtlog_map_t, int32_t, dtlog_ary_t*) table_rule_map; + + DTLOG_T(dtlog_set_t, int32_t) input_tables; + DTLOG_T(dtlog_set_t, int32_t) output_tables; + DTLOG_T2(dtlog_map_t, int32_t, int32_t) param_size; +} dtlog_rule_set_t; + +typedef struct dtlog_io_s { + + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_remove; + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_insert; + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) res; + + int32_t clm_idx; /* This identifies current field. */ + int32_t tbl_idx; /* Index in array of 'res'. */ + + dtlog_tuple_t* cur_tuple; + bool cur_tbl_is_remove; + int32_t cur_tbl_idx; + int32_t cur_tbl_n_fields; + + dtlog_table_t* cur_tbl; + dtlog_map_node_t* hash_pre; /* Previous node of iterator. */ + int32_t hash_b; /* Current bucket of iterator. */ +} dtlog_io_t; + +typedef struct dtlog_engine_s { + dtlog_meta_t m; + + dtlog_rule_set_t rule_set; + /* Map table index to log_table_t. */ + DTLOG_T2(dtlog_map_t, int32_t, dtlog_table_t*) tables; + + bool (*ext_func)( /* To reset state if last 3 param are NULL. */ + struct dtlog_engine_s* eng, + dtlog_table_t*, dtlog_table_t*, dtlog_table_t*); + + dtlog_io_t io; +} dtlog_engine_t; + +typedef struct dtlog_join_param_s { + dtlog_meta_t m; + + dtlog_ints_t* index2; + DTLOG_T(dtlog_ary_t, dtlog_value_t*) select1; + DTLOG_T(dtlog_ary_t, int32_t) select1i; /* Match select1 in size. */ + DTLOG_T(dtlog_ary_t, int32_t) rem1; + DTLOG_T(dtlog_ary_t, int32_t) rem2; + DTLOG_T(dtlog_ary_t, int32_t) out_param; + bool full_join; +} dtlog_join_param_t; + +/* -------------------------------------------------------------------------- + * PROTOTYPES + * -------------------------------------------------------------------------- + */ + +/* What will be defined as external function depends on whether there is + * unit test calling into it. + */ + +void dtlog_topo_sort( + /* This is dependency map of type value->(set of values). */ + DTLOG_T2(dtlog_map_t*, dtlog_value_t*, dtlog_set_t*) g, + /* The following is output of sort. */ + DTLOG_T(dtlog_ary_t*, dtlog_value_t*) order, + DTLOG_T(dtlog_set_t*, dtlog_value_t*) in_nodes, + DTLOG_T(dtlog_set_t*, dtlog_value_t*) out_nodes); + +void dtlog_sort_array( + int32_t start, dtlog_ary_t* list, + dtlog_ary_t* sem1, dtlog_ary_t* sem2); + +int32_t dtlog_insert_item( + int32_t val, dtlog_ary_t* list, void* obj1, + void* obj2, dtlog_ary_t* sem1, dtlog_ary_t* sem2); + +void dtlog_sync_init(const char* rules, dtlog_set_t* gv); +void dtlog_sync_parse(dtlog_map_t* sem); +void dtlog_sem_process(dtlog_rule_set_t* rule_set, dtlog_map_t* sem); + +dtlog_engine_t* dtlog_eng_parse(const char* rules, dtlog_set_t* gv); +void dtlog_eng_set_ext_func(dtlog_engine_t* eng, void* func); + +void dtlog_set_global_value(dtlog_set_t* s); +int32_t dtlog_hash_code(dtlog_hash_t* m, const void* v); +int32_t dtlog_hash_code_byte(const void* v, int32_t* size); +bool dtlog_key_equal(dtlog_hash_t* m, const void* k1, const void* k2); +void dtlog_coll_free(void* coll, int32_t type, dtlog_set_t* gv); + +dtlog_map_t* dtlog_hash_init( + dtlog_map_t* m, int32_t type, + int32_t sz_init, dtlog_hash_t*); + +void dtlog_hash_free(dtlog_hash_t*); +void dtlog_hash_add(dtlog_map_t* m, void* k, void* v); +void* dtlog_hash_del(dtlog_map_t* m, void* k); +void dtlog_hash_rehash(dtlog_map_t* m); +void* dtlog_hash_get_one(dtlog_map_t* m); +dtlog_map_node_t* dtlog_hash_next(dtlog_map_t* m, bool remove, int32_t* b, + dtlog_map_node_t** pre); + +dtlog_bits_t* dtlog_bitset_init(dtlog_bits_t* set); +void dtlog_bitset_free(dtlog_bits_t*); + +dtlog_buf_t* dtlog_buf_init(dtlog_buf_t*); +void dtlog_buf_free(dtlog_buf_t*); +void dtlog_buf_reset(dtlog_buf_t*); +void dtlog_buf_ensure(dtlog_buf_t* buf, int32_t more); + +dtlog_ary_t* dtlog_array_init( + dtlog_ary_t* a, int32_t type, + int32_t i_size, dtlog_set_t* gv); +dtlog_ary_t* dtlog_array_clone(dtlog_ary_t* a); + +void dtlog_array_free(dtlog_ary_t*); +int32_t dtlog_array_look_for(dtlog_ary_t* a, void* v); + +dtlog_ints_t* dtlog_int_tuple_init(DTLOG_T(dtlog_ary_t*, int32_t) a); +void dtlog_int_tuple_free(dtlog_ints_t*); + +dtlog_value_t* dtlog_value_init( + const char* v, int32_t size, dtlog_set_t* gv); + +dtlog_tuple_t* dtlog_tuple_init(int32_t n_values); +void dtlog_tuple_free(dtlog_tuple_t*, dtlog_set_t*, bool); +dtlog_tuple_t* dtlog_tuple_init_val(dtlog_value_t** val, int32_t n_values); +dtlog_tuple_t* dtlog_tuple_init_str(const char* t, dtlog_set_t* gv); + +void dtlog_tuple_print_raw( + dtlog_buf_t* buf, dtlog_tuple_t* t, int32_t start); + +dtlog_tuple_t* dtlog_tuple_init_str_raw( + const char* t, int32_t* len, + dtlog_value_t* extra_key, + dtlog_value_t* null_str, dtlog_set_t* gv); + +dtlog_table_t* dtlog_table_init( + dtlog_table_t* tbl, int32_t n, int32_t f, + int32_t size, dtlog_set_t* gv); + +void dtlog_table_free(dtlog_table_t* tbl); +void dtlog_table_add(dtlog_table_t* tbl, dtlog_tuple_t* t); +void dtlog_table_remove(dtlog_table_t* tbl, dtlog_tuple_t* t); + +int32_t dtlog_table_add_index( + dtlog_table_t* tbl, dtlog_ints_t* index_key); +dtlog_tuple_t* dtlog_index_get_index( + dtlog_table_t* tbl, dtlog_tuple_t* t, int32_t i_idx); +dtlog_tuple_t* dtlog_index_get_index_tuple( + dtlog_tuple_t* t, dtlog_ints_t* def); + +void dtlog_rule_free(dtlog_rule_t* rule); +void dtlog_rule_set_init(dtlog_rule_set_t* rs, dtlog_set_t* gv); +void dtlog_rule_set_free(dtlog_rule_set_t*); + +dtlog_engine_t* dtlog_engine_init(dtlog_engine_t* log, dtlog_set_t* gv); +void dtlog_engine_free(dtlog_engine_t*); + +dtlog_join_param_t* dtlog_join_param_init( + dtlog_join_param_t* jp, + dtlog_ints_t* i2, dtlog_set_t* gv); +void dtlog_join_param_free(dtlog_join_param_t*); + +dtlog_table_t* dtlog_tblopr_join( + dtlog_table_t* t1, dtlog_table_t* t2, + dtlog_join_param_t* joinp); + +int32_t dtlog_get_table_id( + dtlog_engine_t* eng, const char* name, int32_t len); + +void dtlog_eng_do_join( + dtlog_engine_t* eng, + dtlog_table_t* input, dtlog_table_t* output); +void dtlog_eng_do_union(dtlog_engine_t* eng, + dtlog_table_t* input, dtlog_table_t* output); + +DTLOG_T(dtlog_ary_t*, dtlog_table_t*) dtlog_eng_delta( + dtlog_engine_t* eng, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_remove, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_insert); + +DTLOG_T(dtlog_ary_t*, dtlog_table_t*) dtlog_eng_query( + dtlog_engine_t* eng, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) input); + +dtlog_table_t* dtlog_get_org_table(dtlog_engine_t* eng, dtlog_table_t* t); +dtlog_tuple_t* dtlog_query_on0( + dtlog_engine_t* eng, int32_t tid, dtlog_value_t* value); + +void dtlog_io_marshall(const char* val, int32_t sz, dtlog_buf_t*); +int32_t dtlog_io_unmarshall(char* buf, int32_t sz); + +dtlog_buf_t* dtlog_io_encode( + dtlog_engine_t* eng, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) out); + +void dtlog_io_encode_extra( + dtlog_engine_t* eng, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) out, + DTLOG_T2(dtlog_map_t*, dtlog_value_t*, dtlog_buf_t*) bufs); + +bool dtlog_io_decode_0( + dtlog_engine_t* eng, const char* buf, + int32_t buf_sz, dtlog_value_t* extra_key, + dtlog_value_t* null_str, bool check, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_remove, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_insert); + +void dtlog_put_value(dtlog_engine_t* eng, dtlog_value_t* value); + +int32_t dtlog_coll_print(char*, int32_t, void* v, int32_t type, bool verbose); +int32_t dtlog_array_print(char*, int32_t, dtlog_ary_t* a, bool verbose); +int32_t dtlog_hash_print(char*, int32_t, dtlog_hash_t* m, bool verbose); +int32_t dtlog_table_print(char*, int32_t, dtlog_table_t* t, bool verbose); +int32_t dtlog_index_print(char*, int32_t, dtlog_table_t* t); +int32_t dtlog_tuple_print(char*, int32_t, dtlog_tuple_t* t); +int32_t dtlog_rule_set_print(char*, int32_t, dtlog_rule_set_t* rs); +int32_t dtlog_buf_print(char*, int32_t, dtlog_buf_t* item); + +/* -------------------------------------------------------------------------- + * INLINE IMPLEMENTATIONS + * -------------------------------------------------------------------------- + */ + +#define dtlog_map_size(m) ((m)->size) +#define dtlog_map_has(m, k) (dtlog_hash_get(m, k) != NULL) +#define dtlog_map_free(m) dtlog_hash_free(m) +#define dtlog_map_del(m, k) dtlog_hash_del(m, k) + +#define dtlog_set_size(m) ((m)->size) +#define dtlog_set_has(m, k) (dtlog_hash_get(m, k) != NULL) +#define dtlog_set_free(m) dtlog_hash_free(m) +#define dtlog_set_del(m, k) dtlog_hash_del(m, k) + +#define dtlog_array_size(a) ((a)->size) +#define dtlog_table_size(t) (dtlog_set_size(&(t)->tuples)) + +#define dtlog_map_get_int(m, k) dtlog_p2i(dtlog_map_get(m, k)) +#define dtlog_array_get_int(a, i) dtlog_p2i(dtlog_array_get(a, i)) + +#define dtlog_index_i_pre(tuple, i) ((tuple)->indexes[(i) * 2]) +#define dtlog_index_i_suc(tuple, i) ((tuple)->indexes[(i) * 2 + 1]) + +/* -------------------------------------------------------------------------- + * META DATA + * -------------------------------------------------------------------------- + */ + +static inline void* +dtlog_c_realloc(void* ptr, size_t old_sz, size_t new_sz) +{ + /* 'ptr' could be NULL and expanded area will be zeroed. */ + void* n = realloc(ptr, new_sz); + memset(((char*)n) + old_sz, 0, new_sz - old_sz); + return n; +} + +static inline void +dtlog_hash_code_array_init(int32_t* c) +{ + /* Sequence of 0 will have different hash code depending on its length. */ + *c = 1; +} + +static inline void +dtlog_hash_code_array_add(int32_t* c, int32_t i) +{ + *c = (*c) * 31 + i; +} + +static inline void +dtlog_hash_code_array_final(int32_t* c) +{ + if (*c < 0) *c = -(*c); +} + +static inline void +dtlog_coll_alloc(void* ptr, int32_t size, int32_t type, void* global_values) +{ + dtlog_meta_t** m = (dtlog_meta_t**)ptr; + if (*m == NULL) { + *m = malloc(size); + (*m)->alloc = true; + } + else { + (*m)->alloc = false; + } + + (*m)->type = DTLOG_COLL(type); + (*m)->glb_values = global_values; +} + +static inline void +dtlog_coll_free_ptr(void* ptr) +{ + dtlog_meta_t* m = ptr; + if (m->alloc) free(m); +} + +static inline void +dtlog_value_ref(dtlog_value_t* v) +{ + v->ref_no++; +} + +static inline void +dtlog_value_free(dtlog_value_t* v, dtlog_set_t* gv) +{ + if (v->ref_no > 1) --v->ref_no; + else { + int32_t sv_type = gv->m.type; + gv->m.type = 0; + dtlog_hash_del(gv, v); + gv->m.type = sv_type; + free(v); + } +} + +static inline void +dtlog_check_value_ref(void* v, int32_t type, dtlog_set_t* gv, bool add) +{ + if (v == NULL) return; + if (!add) dtlog_coll_free(v, type, gv); + else if (type == DTLOG_T_VALUE) dtlog_value_ref((dtlog_value_t*)v); +} + +/* -------------------------------------------------------------------------- + * BITSET + * -------------------------------------------------------------------------- + */ + +static inline void +dtlog_bitset_set(dtlog_bits_t* set, int32_t b) +{ + int32_t p = b >> 5; + if (p >= set->size) { + /* Since there is no reset operation, it always expands. */ + set->items = dtlog_c_realloc( + set->items, + set->size * sizeof(int32_t), (p + 1) * sizeof(int32_t)); + set->size = p + 1; + } + + set->items[p] |= 1 << (b % 32); +} + +static inline bool +dtlog_bitset_get(dtlog_bits_t* set, int b) +{ + int p = b >> 5; + if (p >= set->size) return false; + return (set->items[p] & (1 << (b % 32))) != 0; +} + +static inline bool +dtlog_bitset_empty(dtlog_bits_t* set) +{ + int i; + for (i = 0;i < set->size;i++) + if (set->items[i] != 0) return false; + return true; +} + +static inline void +dtlog_bitset_and(dtlog_bits_t* dest, dtlog_bits_t* src) +{ + int i; + int32_t m_size = dest->size > src->size ? src->size : dest->size; + + for (i = 0;i < m_size;i++) dest->items[i] &= src->items[i]; + for (i = m_size;i < dest->size;i++) dest->items[i] = 0; +} + +/* -------------------------------------------------------------------------- + * ARRAY + * -------------------------------------------------------------------------- + */ + +static inline void +dtlog_array_add(dtlog_ary_t* a, void* i) +{ + dtlog_assert(a->size <= a->len); + + if (a->size == a->len) { + a->item = dtlog_c_realloc(a->item, + a->len * sizeof(void*), a->len * 2 * sizeof(void*)); + a->len = a->len * 2; + } + a->item[a->size++] = i; + dtlog_check_value_ref(i, DTLOG_KTYPE(a->m.type), a->m.glb_values, true); +} + +static inline void +dtlog_array_ins(dtlog_ary_t* a, int32_t pos, void* i) +{ + dtlog_array_add(a, NULL); /* Make room for new item. */ + memmove(&a->item[pos + 1], &a->item[pos], + (a->size - pos - 1) * sizeof(void*)); /* Size has increased by 1. */ + + a->item[pos] = i; + dtlog_check_value_ref(i, DTLOG_KTYPE(a->m.type), a->m.glb_values, true); +} + +static inline void* +dtlog_array_rmv(dtlog_ary_t* a, int32_t pos) +{ + /* No change to reference count. */ + void* org = a->item[pos]; + memmove(&a->item[pos], &a->item[pos + 1], + (a->size - pos - 1) * sizeof(void*)); + a->size--; + return org; +} + +static inline void* +dtlog_array_get(dtlog_ary_t* a, int32_t i) +{ + dtlog_assert(i >= 0 && i < a->size); + return a->item[i]; +} + +static inline void +dtlog_array_set(dtlog_ary_t* a, int i, void* v) +{ + dtlog_assert(i >= 0 && i < a->size); + dtlog_check_value_ref(a->item[i], DTLOG_KTYPE(a->m.type), + a->m.glb_values, false); + + a->item[i] = v; + dtlog_check_value_ref(v, DTLOG_KTYPE(a->m.type), a->m.glb_values, true); +} + +/* -------------------------------------------------------------------------- + * HASH TABLE + * -------------------------------------------------------------------------- + */ + +static inline dtlog_map_node_t* +dtlog_hash_get(dtlog_map_t* m, void* k) +{ + /* No change to reference count. */ + + int32_t code = dtlog_hash_code(m, k); + int32_t slot = code % m->len; + dtlog_map_node_t* head = m->bucket[slot]; + + while (head != NULL) { + if (dtlog_key_equal(m, head->key, k)) return head; + head = head->next; + } + return NULL; +} + +/* -------------------------------------------------------------------------- + * MAP + * -------------------------------------------------------------------------- + */ + +static inline dtlog_map_t* +dtlog_map_init(dtlog_map_t* m, int type, int sz_init, dtlog_set_t* gv) +{ + return dtlog_hash_init(m, type | DTLOG_COLL(DTLOG_T_MAP), sz_init, gv); +} + +static inline void* +dtlog_map_get(dtlog_map_t* m, void* k) +{ + dtlog_map_node_t* node = dtlog_hash_get(m, k); + if (node == NULL) return NULL; + else return node->value; +} + +static inline void +dtlog_map_add(dtlog_map_t* m, void* k, void* v) +{ + dtlog_map_node_t* node = dtlog_hash_get(m, k); + + if (node != NULL) { + void* old = node->value; + dtlog_check_value_ref(old, DTLOG_VALUE(m->m.type), + m->m.glb_values, false); + + node->value = v; + dtlog_check_value_ref(v, DTLOG_VALUE(m->m.type), + m->m.glb_values, true); + + } else dtlog_hash_add(m, k, v); +} + +/* -------------------------------------------------------------------------- + * SET + * -------------------------------------------------------------------------- + */ + +static inline dtlog_set_t* +dtlog_set_init(dtlog_set_t* m, int type, int sz_init, dtlog_set_t* gv) +{ + return dtlog_hash_init(m, type | DTLOG_COLL(DTLOG_T_SET), sz_init, gv); +} + +static inline void* +dtlog_set_get(dtlog_set_t* m, void* k) +{ + dtlog_map_node_t* node = dtlog_hash_get(m, k); + if (node == NULL) return NULL; + else return node->key; +} + +static inline void +dtlog_set_add(dtlog_set_t* m, void* k) +{ + dtlog_map_node_t* node = dtlog_hash_get(m, k); + if (node != NULL) return; + dtlog_hash_add(m, k, NULL); +} + +/* -------------------------------------------------------------------------- + * ITERATIONS + * -------------------------------------------------------------------------- + */ + +/* Nested usage is supported. 'continue', 'break', 'return' could be used. */ + +#define DTLOG_ARRAY_0(array, node, type, typec) \ + { int __array_i = 0; \ + for (;__array_i < (array)->size;__array_i++) { \ + type node = typec((array)->item[__array_i]); + +#define DTLOG_MAP_ITRT(map) \ + { int32_t __map_i = 0; \ + dtlog_map_t* __map = (dtlog_map_t*)(map); \ + dtlog_map_node_t *__map_pre = NULL, *__map_cur = NULL; \ + while ((__map_cur = dtlog_hash_next(__map, \ + __map_pre != NULL && __map_cur == NULL, \ + &__map_i, &__map_pre)) != NULL) { + +#define DTLOG_MAP_ALL(map, node) DTLOG_MAP_ITRT(map) \ + struct dtlog_map_node_s* node = __map_cur; + +#define DTLOG_SET_ALL_0(set, node, type, typec) \ + DTLOG_MAP_ITRT(set) type node = typec(__map_cur->key); + +#define DTLOG_SET_DEL_ITEM /* Will not rehash. */ \ + (__map->size--, __map_pre->next = __map_cur->next, \ + free(__map_cur), __map_cur = NULL) + +#define DTLOG_INDEX_ALL(tuple, i_idx, node) \ + { bool __index_first = true; \ + dtlog_tuple_t* __index_head = tuple; \ + dtlog_tuple_t* node; \ + if (__index_head != NULL) for (;;) { \ + if (__index_first) { \ + node = __index_head; __index_first = false; \ + } else { \ + (node) = (node)->indexes[i_idx * 2 + 1]; \ + if (__index_head == node) break; \ + } + +#define DTLOG_SET_END }} +#define DTLOG_ARRAY_END }} +#define DTLOG_MAP_END }} +#define DTLOG_INDEX_END }} + +/* 'n' is node, and 't' is type. */ +#define DTLOG_ARRAY_ALL(a, n, t) DTLOG_ARRAY_0(a, n, t, (t)) +#define DTLOG_ARRAY_ALL_INT(a, n) DTLOG_ARRAY_0(a, n, int32_t, dtlog_p2i) +#define DTLOG_SET_ALL(s, n, t) DTLOG_SET_ALL_0(s, n, t, (t)) +#define DTLOG_SET_ALL_INT(s, n) DTLOG_SET_ALL_0(s, n, int32_t, dtlog_p2i) + +#ifdef __cplusplus +} +#endif + +#endif /* datalog-private.h */ diff --git a/ovn/lib/datalog.c b/ovn/lib/datalog.c new file mode 100644 index 0000000..622db7f --- /dev/null +++ b/ovn/lib/datalog.c @@ -0,0 +1,3759 @@ +/* Copyright (c) 2016 VMware, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef USE_OUTSIDE_OVS +#include +#endif + +#include "./datalog.h" +#include "./datalog-private.h" + +/* -------------------------------------------------------------------------- + * BASIC COLLECTIONS + * -------------------------------------------------------------------------- + */ + +void +dtlog_coll_free(void* coll, int32_t type, dtlog_set_t* gv) +{ + switch (type) { + case DTLOG_T_VALUE: dtlog_value_free(coll, gv); break; + case DTLOG_T_TUPLE: dtlog_tuple_free(coll, gv, true); break; + + case DTLOG_T_ARRAY: dtlog_array_free(coll); break; + case DTLOG_T_MAP: + case DTLOG_T_SET: + case DTLOG_T_INDEX: dtlog_hash_free(coll); break; + + case DTLOG_T_BITSET: dtlog_bitset_free(coll); break; + case DTLOG_T_BUF: dtlog_buf_free(coll); break; + case DTLOG_T_TABLE: dtlog_table_free(coll); break; + case DTLOG_T_INT_TUPLE: dtlog_int_tuple_free(coll); break; + case DTLOG_T_RULE: dtlog_rule_free(coll); break; + case DTLOG_T_RULE_SET: dtlog_rule_set_free(coll); break; + case DTLOG_T_LOG_ENG: dtlog_engine_free(coll); break; + case DTLOG_T_JOIN_PARAM:dtlog_join_param_free(coll); break; + } +} + +/* -------------------------------------------------------------------------- + * BUFFER + * -------------------------------------------------------------------------- + */ + +dtlog_buf_t* +dtlog_buf_init(dtlog_buf_t* buf) +{ + dtlog_coll_alloc(&buf, sizeof(dtlog_buf_t), DTLOG_T_BUF, NULL); + buf->pos = 0; + buf->size = 256; + buf->buf = malloc(buf->size); + return buf; +} + +void +dtlog_buf_free(dtlog_buf_t* buf) +{ + free(buf->buf); + dtlog_coll_free_ptr(buf); +} + +void +dtlog_buf_ensure(dtlog_buf_t* buf, int32_t more) +{ + if (buf->pos + more >= buf->size) { + /* One byte more so it is safe to append \0 if needed. */ + buf->size = buf->size * 2; + buf->buf = realloc(buf->buf, buf->size); + } +} + +void +dtlog_buf_reset(dtlog_buf_t* buf) +{ + buf->pos = 0; +} + +/* -------------------------------------------------------------------------- + * BITSET + * -------------------------------------------------------------------------- + */ + +dtlog_bits_t* +dtlog_bitset_init(dtlog_bits_t* set) +{ + dtlog_coll_alloc(&set, sizeof(dtlog_bits_t), DTLOG_T_BITSET, NULL); + set->size = DTLOG_SZ_INIT_BITSET; + set->items = calloc(set->size, sizeof(int32_t)); + return set; +} + +void +dtlog_bitset_free(dtlog_bits_t* dest) +{ + if (dest->items != NULL) free(dest->items); + dtlog_coll_free_ptr(dest); +} + +/* -------------------------------------------------------------------------- + * ARRAY + * -------------------------------------------------------------------------- + */ + +void +dtlog_set_global_value(dtlog_set_t* s) +{ + s->m.type = DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_COLL(DTLOG_T_SET); + s->m.glb_values = s; +} + +dtlog_ary_t* +dtlog_array_init(dtlog_ary_t* a, int32_t type, + int32_t i_size, dtlog_set_t* gv) +{ + dtlog_coll_alloc(&a, sizeof(dtlog_ary_t), DTLOG_T_ARRAY, gv); + + a->len = i_size; + a->m.type |= type; + if (a->len == 0) a->len = DTLOG_SZ_INIT_ARRAY; + a->item = calloc(a->len, sizeof(void*)); + a->size = 0; + return a; +} + +dtlog_ary_t* +dtlog_array_clone(dtlog_ary_t* a) +{ + int i; + dtlog_ary_t* na = dtlog_array_init(NULL, a->m.type, + a->size, a->m.glb_values); + + memcpy(na->item, a->item, a->size * sizeof(void*)); + na->size = a->size; + + for (i = 0;i < a->size;i++) + dtlog_check_value_ref(na->item[i], DTLOG_KTYPE(na->m.type), + na->m.glb_values, true); + return na; +} + +void +dtlog_array_free(dtlog_ary_t* a) +{ + int i; + for (i = 0;i < a->size;i++) { + dtlog_check_value_ref(a->item[i], DTLOG_KTYPE(a->m.type), + a->m.glb_values, false); + } + + free(a->item); + dtlog_coll_free_ptr(a); +} + +int32_t +dtlog_array_look_for(dtlog_ary_t* a, void* v) +{ + int i = 0; + bool found = false; + + DTLOG_ARRAY_ALL(a, item, const void*) + if (item == v) { + found = true; + break; + } + i++; + DTLOG_ARRAY_END + + if (found) return i; + else return -1; +} + +/* Print functions are used for debugging purpose and it is assumed 'buf' + * is large enough. Would be changed to use log_buf_t later. + */ + +int32_t +dtlog_array_print(char* buf, int32_t pos, dtlog_ary_t* a, bool verbose) +{ + int32_t i = 0; + buf[pos++] = '['; + int32_t ktype = DTLOG_KTYPE(a->m.type); + + for (i = 0; i < a->size; i++) { + if (i > 0) buf[pos++] = ','; + pos = dtlog_coll_print(buf, pos, a->item[i], ktype, verbose); + } + buf[pos++] = ']'; + return pos; +} + +/* -------------------------------------------------------------------------- + * HASH CODE + * -------------------------------------------------------------------------- + */ + +int32_t +dtlog_hash_code_byte(const void* v, int32_t* size) +{ + /* Size of 0 indicates null terminated string, and the actual size not + * including null char is returned in 'size'. + */ + + const unsigned char* p = v; + uint32_t hash = 0; + int32_t i = 0; + + /* Jenkins's Hash. */ + if (*size == 0) { + unsigned char c; + while ((c = *p++) != 0) { + i++; + hash += c; + hash += (hash << 10); + hash ^= (hash >> 6); + } + *size = i; + } + else { + int32_t s = *size; + for (;i < s;i++) { + hash += *p++; + hash += (hash << 10); + hash ^= (hash >> 6); + } + } + + hash += (hash << 3); + hash ^= (hash >> 11); + hash += (hash << 15); + return (int32_t) hash < 0 ? -hash : hash; +} + +bool +dtlog_key_equal(dtlog_hash_t* m, const void* k1, const void* k2) +{ + /* If only one key is from hash_table, it must be 'k1'. */ + int type = DTLOG_KTYPE(m->m.type); + + if (type == DTLOG_T_VALUE) { + return k1 == k2; + } + else if (type == DTLOG_T_TUPLE) { + const dtlog_tuple_t* t1 = k1; + const dtlog_tuple_t* t2 = k2; + + if (t1->hash_code != t2->hash_code) return false; + int32_t sz_tuples = + (((dtlog_table_t*)(m->aux))->num_fields) * sizeof(void*); + return memcmp(&t1->values, &t2->values, sz_tuples) == 0; + } + + else if (DTLOG_CTYPE(m->m.type) == DTLOG_T_INDEX) { + const dtlog_tuple_t* t1 = k1; + const dtlog_tuple_t* t2 = k2; + + int32_t i; + dtlog_ints_t* index_def = m->aux; + + if (t1 == t2) return true; + if (t2->count == 0) { /* Compact form - only contains keys. */ + for (i = 0;i < index_def->n_items;i++) + if (t1->values[index_def->values[i]] != t2->values[i]) + return false; + } + else { + for (i = 0;i < index_def->n_items;i++) + if (t1->values[index_def->values[i]] != + t2->values[index_def->values[i]]) { + return false; + } + } + return true; + } + + else if (m == m->m.glb_values) { + const dtlog_value_t* v1 = k1; + const dtlog_value_t* v2 = k2; + + if (v1->size == 0 && v2->size == 0) return true; + if (v1->size >= 0 && v2->size >= 0) return k1 == k2; + if (v1->hash_code != v2->hash_code) return false; + + const void* p1 = v1->size > 0 ? v1->value.a : v1->value.p; + const void* p2 = v2->size > 0 ? v2->value.a : v2->value.p; + + int32_t s1 = v1->size > 0 ? v1->size : (-v1->size); + int32_t s2 = v2->size > 0 ? v2->size : (-v2->size); + if (s1 != s2) return false; + return memcmp(p1, p2, s1) == 0; + } + + else if (type == DTLOG_T_INT_TUPLE) { + const dtlog_ints_t* t1 = k1; + const dtlog_ints_t* t2 = k2; + + if (t1->hash_code != t2->hash_code) return false; + if (t1->n_items != t2->n_items) return false; + + int s = t1->n_items * sizeof(int32_t); + return memcmp(&t1->values, &t2->values, s) == 0; + } + + else if (type == DTLOG_T_INT32 || type == DTLOG_T_TST_INT32) + return dtlog_p2i((void*)k1) == dtlog_p2i((void*)k2); + + else if (type == DTLOG_T_STR) return strcmp(k1, k2) == 0; + + dtlog_assert(false); + return false; +} + +int32_t +dtlog_hash_code(dtlog_hash_t* m, const void* v) +{ + int32_t type = DTLOG_KTYPE(m->m.type); + if (type == DTLOG_T_STR) { + int32_t size0 = 0; + return dtlog_hash_code_byte(v, &size0); + } + + else if (DTLOG_CTYPE(m->m.type) == DTLOG_T_INDEX) { + int32_t i, code; + dtlog_tuple_t* t = (dtlog_tuple_t*)v; + + dtlog_ints_t* index_def = m->aux; + dtlog_hash_code_array_init(&code); + + if (t->count == 0) { /* Compact form. */ + for (i = 0; i < index_def->n_items; i++) { + dtlog_value_t* v = t->values[i]; + dtlog_hash_code_array_add(&code, v->hash_code); + } + } else { + for (i = 0; i < index_def->n_items; i++) { + dtlog_value_t* v = t->values[index_def->values[i]]; + dtlog_hash_code_array_add(&code, v->hash_code); + } + } + + dtlog_hash_code_array_final(&code); + return code; + } + + else if (type == DTLOG_T_INT32) { + int32_t n = dtlog_p2i((void*)v); + return n < 0 ? -n : n; + + } else if (type == DTLOG_T_TST_INT32) + return dtlog_p2i((void*)v) % 100; + else + return *(const int32_t*) v; +} + +/* -------------------------------------------------------------------------- + * HASH TABLE + * -------------------------------------------------------------------------- + */ + +dtlog_map_t* +dtlog_hash_init(dtlog_map_t* m, int type, int sz_init, + struct dtlog_hash_s* values) +{ + dtlog_coll_alloc(&m, sizeof(dtlog_map_t), 0, values); + m->m.type = type; + m->size = 0; + m->aux = NULL; + m->len = sz_init == 0 ? DTLOG_SZ_INIT_HASH : sz_init; + m->bucket = calloc(m->len, sizeof(void*)); + return m; +} + +void +dtlog_hash_free(dtlog_map_t* m) +{ + int32_t i; + int32_t k_type = DTLOG_KTYPE(m->m.type); + int32_t v_type = DTLOG_VTYPE(m->m.type); + int32_t c_type = DTLOG_CTYPE(m->m.type); + + for (i = 0;i < m->len;i++) { + dtlog_map_node_t* head = m->bucket[i]; + + while (head != NULL) { + if (m == m->m.glb_values) free((void*)head->key); + else { + void* key = c_type != DTLOG_T_INDEX ? head->key : NULL; + void* value = c_type == DTLOG_T_MAP ? head->value : NULL; + dtlog_check_value_ref(key, k_type, m->m.glb_values, false); + dtlog_check_value_ref(value, v_type, m->m.glb_values, false); + } + + dtlog_map_node_t* next = head->next; + free(head); + head = next; + } + } + + free(m->bucket); + dtlog_coll_free_ptr(m); +} + +void +dtlog_hash_rehash(dtlog_map_t* m) +{ + int nlen; + if (m->size > m->len * 2 / 3) nlen = m->len * 2; + else if (m->size < m->len / 5 && m->size > 50) nlen = m->len / 2; + else return; /* 50 is an arbitrary number. */ + + int32_t i; + dtlog_map_node_t** nb = calloc(nlen, sizeof(void*)); + + for (i = 0;i < m->len;i++) { + dtlog_map_node_t* head = m->bucket[i]; + + while (head != NULL) { + dtlog_map_node_t* next = head->next; + + int32_t b; + if (DTLOG_CTYPE(m->m.type) == DTLOG_T_INDEX) + b = dtlog_p2i(head->value); + else b = dtlog_hash_code(m, head->key); + + /* The list is reversed in some sense. */ + dtlog_map_node_t* nhead = nb[b % nlen]; + if (nhead == NULL) head->next = NULL; + else head->next = nhead; + + nb[b % nlen] = head; + head = next; + } + } + + free(m->bucket); + m->bucket = nb; + m->len = nlen; +} + +dtlog_map_node_t* +dtlog_hash_next(dtlog_map_t* m, bool remove, int32_t* b, + dtlog_map_node_t** pre) +{ + /* Returns false if there is no next item. 'remove' indicates that the + * previous value returned by this call has been freed. */ + + int32_t i = *b; + for (;;) { + if (*pre == NULL || (*pre)->next == NULL) { + if (remove) i++; + while (i < m->len && m->bucket[i] == NULL) i++; + if (i >= m->len) return false; + + *b = i; + *pre = (dtlog_map_node_t*)&m->bucket[i]; + return m->bucket[i]; + } + + if (!remove) *pre = (*pre)->next; + if ((*pre)->next) return (*pre)->next; + i++; + } + return false; /* Never reach here. */ +} + +void +dtlog_hash_add(dtlog_map_t* m, void* k, void* v) +{ + /* Should check key existence before calling this. */ + + int32_t type = DTLOG_CTYPE(m->m.type); + int32_t code = dtlog_hash_code(m, k); + int32_t slot = code % m->len; + + dtlog_map_node_t* item; + dtlog_map_node_t* head = m->bucket[slot]; + + if (type == DTLOG_T_MAP) { + item = malloc(sizeof (dtlog_map_node_t)); + item->value = v; + } + else if (type == DTLOG_T_SET) { + item = malloc(sizeof (dtlog_set_node_t)); + } + else if (type == DTLOG_T_INDEX) { + item = malloc(sizeof (dtlog_index_node_t)); + item->value = dtlog_i2p(code); + } + else dtlog_assert(false); + + item->key = k; + item->next = head; + m->bucket[slot] = item; + m->size++; + + dtlog_check_value_ref(k, DTLOG_KTYPE(m->m.type), m->m.glb_values, true); + dtlog_check_value_ref(v, DTLOG_VTYPE(m->m.type), m->m.glb_values, true); + dtlog_hash_rehash(m); +} + +void* +dtlog_hash_del(dtlog_map_t* m, void* k) +{ + /* Reference counter of value will NOT be changed. Key is freed. */ + + int32_t slot = dtlog_hash_code(m, k) % m->len; + dtlog_map_node_t* head = m->bucket[slot]; + dtlog_map_node_t* pre = NULL; + + while (head != NULL) { + if (dtlog_key_equal(m, head->key, k)) { + + if (pre == NULL) m->bucket[slot] = head->next; + else pre->next = head->next; + + const void* value = DTLOG_CTYPE(m->m.type) == DTLOG_T_MAP ? + head->value : NULL; + free(head); + + dtlog_check_value_ref(k, DTLOG_KTYPE(m->m.type), + m->m.glb_values, false); + + /* Value's reference will not be changed, so that caller may + * still use the value, even only for free. */ + m->size--; + dtlog_hash_rehash(m); + return (void*)value; + } + pre = head; + head = head->next; + } + return NULL; +} + +void* +dtlog_hash_get_one(dtlog_map_t* m) +{ + if (m->size == 0) return NULL; + DTLOG_MAP_ALL(m, node) + return node->key; + DTLOG_MAP_END + + return NULL; /* Not reachable. */ +} + +int32_t +dtlog_hash_print(char* buf, int32_t pos, dtlog_hash_t* m, bool verbose) +{ + int i, j = 0; + int32_t ktype = DTLOG_KTYPE(m->m.type); + int32_t vtype = DTLOG_VTYPE(m->m.type); + int32_t htype = DTLOG_CTYPE(m->m.type); + + if (verbose) + pos += sprintf(buf + pos, " hash %s, size=%d, len=%d\n", + htype == DTLOG_T_SET ? "set" : "map", m->size, m->len); + else buf[pos++] = '{'; + + for (i = 0;i < m->len;i++) { + dtlog_map_node_t* head = m->bucket[i]; + if (head == NULL) continue; + if (verbose) pos += sprintf(buf + pos, " [%d] ", i); + + while (head != NULL) { + if (j > 0 && !verbose) buf[pos++] = ','; + if (verbose) + pos += sprintf(buf + pos, " (%x)", + dtlog_hash_code(m, head->key)); + + pos = dtlog_coll_print(buf, pos, head->key, ktype, verbose); + if (htype == DTLOG_T_MAP) { + pos += sprintf(buf + pos, "->"); + pos = dtlog_coll_print(buf, pos, head->value, vtype, verbose); + } + + head = head->next; + if (++j > 200) { + pos += sprintf(buf + pos, " ..."); + if (verbose) pos += sprintf(buf + pos, "\n"); + return pos; + } + } + if (verbose) pos += sprintf(buf + pos, "\n"); + } + + if (!verbose) buf[pos++] = '}'; + return pos; +} + +/* For serialization. */ +dtlog_config_t dtlog_config = { .sep1 = ':', .sep2 = '\n', .esc = '@' }; + +/* -------------------------------------------------------------------------- + * VALUES, INDEXES, AND TABLES + * -------------------------------------------------------------------------- + */ + +/* + * For tuple used as key passed to hash_* function, tuple->count: + * == 0 indicates compact form, i.e., tuple only contains key fields, and + * hash_code has not been provisioned. + * != 0 indicates tuple is from a table and hash_code is NOT the hash code + * for the key fields (instead, it is the hash code for tuple). + */ + +dtlog_ints_t* +dtlog_int_tuple_init(DTLOG_T(dtlog_ary_t*, int32_t) a) +{ + dtlog_ints_t* it = calloc(1, + sizeof(dtlog_ints_t) + sizeof(int32_t) * dtlog_array_size(a)); + it->n_items = dtlog_array_size(a); + + int32_t code; + dtlog_hash_code_array_init(&code); + + int32_t i = 0; + DTLOG_ARRAY_ALL_INT(a, v) + it->values[i++] = v; + dtlog_hash_code_array_add(&code, v); + DTLOG_ARRAY_END + + dtlog_hash_code_array_final(&code); + it->hash_code = code; + return it; +} + +static dtlog_ints_t* +dtlog_int_tuple_clone(dtlog_ints_t* i) +{ + int32_t sz = sizeof(dtlog_ints_t) + sizeof(int32_t) * i->n_items; + dtlog_ints_t* it = malloc(sz); + memcpy(it, i, sz); + return it; +} + +void +dtlog_int_tuple_free(dtlog_ints_t* t) +{ + free(t); +} + +static int32_t +dtlog_int_tuple_print(char* buf, int32_t pos, dtlog_ints_t* t) +{ + int32_t i; + buf[pos++] = '['; + + for (i = 0;i < t->n_items;i++) { + if (i > 0) buf[pos++] = ','; + pos += sprintf(buf + pos, "%d", t->values[i]); + } + buf[pos++] = ']'; + return pos; +} + +dtlog_value_t* +dtlog_value_init(const char* v, int32_t size, dtlog_set_t* gv) +{ + /* There are two types of values: null terminating string (size == 0) + * and byte array (size indicates the size of byte array). When size is + * 0, *v must be 0 for null string. Data will be copied and saved in + * global value set. The reference is increased after calling this. + */ + + dtlog_value_t inp; + inp.value.p = (char*)v; + inp.size = size; + inp.hash_code = dtlog_hash_code_byte(v, &inp.size); + inp.size = -inp.size; /* Using value.p. */ + + int32_t sv_type = gv->m.type; + gv->m.type = 0; + + dtlog_value_t* setv = dtlog_set_get(gv, &inp); + gv->m.type = sv_type; + + if (setv != NULL) { + setv->ref_no++; + return setv; + } + + inp.size = -inp.size; + int32_t offset = inp.value.a - ((char*)&inp); + setv = calloc(1, offset + inp.size + 1); + memcpy(setv, &inp, offset); + + memcpy(setv->value.a, v, inp.size); + /* Make it null terminate string for debugging. */ + *((char*)setv->value.a + inp.size) = 0; + + setv->ref_no = 0; + dtlog_hash_add(gv, setv, NULL); + return setv; +} + +static int32_t +dtlog_value_print(char* buf, int32_t pos, dtlog_value_t* value, bool verbose) +{ + if (value == NULL) return pos + sprintf(buf + pos, ""); + else if (verbose) + return pos + sprintf(buf + pos, "%s", + value->value.a, value->ref_no, value->size); + else return pos + sprintf(buf + pos, "%s", value->value.a); +} + +static dtlog_rule_t* +dtlog_rule_init(dtlog_rule_t* rule, dtlog_set_t* gv) +{ + dtlog_coll_alloc(&rule, sizeof(dtlog_rule_t), DTLOG_T_RULE, gv); + rule->is_union = false; + + dtlog_array_init(&rule->rule, DTLOG_T_INT32, 0, gv); + dtlog_array_init(&rule->param, DTLOG_T_ARRAY, 0, gv); + dtlog_map_init(&rule->const_param, DTLOG_MAP_I2V, 0, gv); + dtlog_map_init(&rule->param_name_map, DTLOG_MAP_I2V, 0, gv); + return rule; +} + +void +dtlog_rule_free(dtlog_rule_t* rule) +{ + dtlog_array_free(&rule->rule); + dtlog_array_free(&rule->param); + dtlog_map_free(&rule->const_param); + dtlog_map_free(&rule->param_name_map); + dtlog_coll_free_ptr(rule); +} + +void +dtlog_rule_set_init(dtlog_rule_set_t* rs, dtlog_set_t* gv) +{ + dtlog_coll_alloc(&rs, sizeof(dtlog_rule_set_t), DTLOG_T_RULE_SET, gv); + dtlog_map_init(&rs->rule_name_map, DTLOG_MAP_I2V, 0, gv); + + dtlog_map_init(&rs->rule_index_map, + DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_VALUE(DTLOG_T_INT32), 0, gv); + + dtlog_map_init(&rs->rules, + DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_RULE), 0, gv); + + dtlog_map_init(&rs->table_rule_map, + DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_ARRAY), 0, gv); + + dtlog_set_init(&rs->input_tables, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + dtlog_set_init(&rs->output_tables, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + dtlog_map_init(&rs->param_size, + DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_INT32), 0, gv); +} + +void +dtlog_rule_set_free(dtlog_rule_set_t* rs) +{ + dtlog_map_free(&rs->rule_name_map); + dtlog_map_free(&rs->rule_index_map); + dtlog_map_free(&rs->rules); + dtlog_map_free(&rs->table_rule_map); + dtlog_set_free(&rs->input_tables); + dtlog_set_free(&rs->output_tables); + dtlog_map_free(&rs->param_size); + dtlog_coll_free_ptr(rs); +} + +dtlog_engine_t* +dtlog_engine_init(dtlog_engine_t* log, dtlog_set_t* gv) +{ + dtlog_coll_alloc(&log, sizeof(dtlog_engine_t), DTLOG_T_LOG_ENG, gv); + dtlog_map_init(&log->tables, + DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE), 0, gv); + + dtlog_rule_set_init(&log->rule_set, gv); + log->ext_func = NULL; + return log; +} + +void +dtlog_engine_free(dtlog_engine_t* log) +{ + dtlog_map_free(&log->tables); + dtlog_rule_set_free(&log->rule_set); + dtlog_coll_free_ptr(log); +} + +/* -------------------------------------------------------------------------- + * TUPLES + * -------------------------------------------------------------------------- + */ + +/* Indexes will never be manipulated directly. Use dtlog_table_add|remove. */ + +void +dtlog_tuple_free(dtlog_tuple_t* t, dtlog_set_t* values, bool free_val) +{ + /* 'free_val' indicates if values should be freed. */ + + int32_t i; + if (free_val) { + for (i = 0;i < t->n_values;i++) + if (t->values[i] != NULL) dtlog_value_free(t->values[i], values); + } + + if (t->indexes != NULL) free(t->indexes); + free(t); +} + +static void +dtlog_tuple_set_hash_code(dtlog_tuple_t* t, int32_t n_values) +{ + int32_t i, code; + dtlog_hash_code_array_init(&code); + + for (i = 0;i < n_values;i++) { + /* NULL is used only for specifying query condition. */ + if (t->values[i] != NULL) + dtlog_hash_code_array_add(&code, t->values[i]->hash_code); + } + + dtlog_hash_code_array_final(&code); + t->hash_code = code; +} + +dtlog_tuple_t* +dtlog_tuple_init(int32_t n_values) +{ + dtlog_tuple_t* tuple = + calloc(1, sizeof(dtlog_tuple_t) + sizeof(void*) * n_values); + tuple->n_values = n_values; + return tuple; +} + +dtlog_tuple_t* +dtlog_tuple_init_val(dtlog_value_t** val, int32_t n_values) +{ + /* Reference will not be updated. */ + + dtlog_tuple_t* tuple = dtlog_tuple_init(n_values); + if (val != NULL) { + memcpy(tuple->values, val, sizeof(void*) * n_values); + dtlog_tuple_set_hash_code(tuple, n_values); + } + return tuple; +} + +static dtlog_tuple_t* +dtlog_tuple_clone(dtlog_tuple_t* tuple) +{ + int32_t i; + dtlog_tuple_t* nt = dtlog_tuple_init_val(tuple->values, tuple->n_values); + for (i = 0;i < tuple->n_values;i++) dtlog_value_ref(tuple->values[i]); + nt->count = tuple->count; + return nt; +} + +dtlog_tuple_t* +dtlog_tuple_init_str_raw(const char* t, int32_t* len, + dtlog_value_t* extra_key, dtlog_value_t* null_str, + dtlog_set_t* gv) +{ + /* The value's reference will be added. + * Input is in the form of 'n:f0: ... : fn'. Input 't' may be modified + * due to unmarshalling. + */ + + char sep1 = dtlog_config.sep1; + char sep2 = len == NULL ? '\0' : dtlog_config.sep2; + int64_t count = atoll(t); + + DTLOG_T(dtlog_ary_t, char*) pos; + dtlog_array_init(&pos, 0, 0, gv); + + char* p = (char*)t; + for (;*p != sep2;p++) + if (*p == sep1) dtlog_array_add(&pos, p); + + if (len != NULL) *len = p - t + 1; + if (t[0] < '0' || t[0] > '9') { + dtlog_array_free(&pos); + return NULL; + } + + int32_t size = dtlog_array_size(&pos); + dtlog_array_add(&pos, p); + + if (size == 0) { + dtlog_array_free(&pos); + return NULL; + } + + int offset = extra_key == NULL ? 0 : 1; + dtlog_tuple_t* tuple = dtlog_tuple_init(size + offset); + tuple->count = count; + + if (extra_key != NULL) { + tuple->values[0] = extra_key; + dtlog_value_ref(extra_key); + } + + int i; + for (i = 0;i < size;i++) { + int32_t sz = (char*)dtlog_array_get(&pos, i + 1) - + ((char*)dtlog_array_get(&pos, i) + 1); + + char* raw = (char*)dtlog_array_get(&pos, i) + 1; + int32_t new_sz = dtlog_io_unmarshall(raw, sz); + + dtlog_value_t* val = dtlog_value_init(new_sz == 0 ? "" : raw, + new_sz, gv); + tuple->values[i + offset] = null_str == val ? NULL : val; + } + + dtlog_array_free(&pos); + dtlog_tuple_set_hash_code(tuple, size); + return tuple; +} + +dtlog_tuple_t* +dtlog_tuple_init_str(const char* t, dtlog_set_t* gv) +{ + return dtlog_tuple_init_str_raw(t, NULL, NULL, NULL, gv); +} + +int32_t +dtlog_tuple_print(char* buf, int32_t pos, dtlog_tuple_t* t) +{ + pos += sprintf(buf + pos, "%" PRId64, t->count); + buf[pos++] = dtlog_config.sep1; + + int32_t i; + for (i = 0;i < t->n_values;i++) { + if (i > 0) buf[pos++] = dtlog_config.sep1; + pos = dtlog_value_print(buf, pos, t->values[i], false); + } + return pos; +} + +void +dtlog_tuple_print_raw(dtlog_buf_t* buf, dtlog_tuple_t* t, int32_t start) +{ + dtlog_buf_ensure(buf, 21); /* 20 is max len for long value. */ + buf->pos += sprintf(buf->buf + buf->pos, "%" PRId64, t->count); + (buf->buf)[buf->pos++] = dtlog_config.sep1; + + int32_t i; + for (i = start;i < t->n_values;i++) { + dtlog_buf_ensure(buf, 1 + t->values[i]->size); + if (i > start) (buf->buf)[buf->pos++] = dtlog_config.sep1; + dtlog_io_marshall(t->values[i]->value.a, t->values[i]->size, buf); + } + + dtlog_buf_ensure(buf, 1); + (buf->buf)[buf->pos++] = dtlog_config.sep2; +} + +int32_t +dtlog_buf_print(char* buf, int32_t pos, dtlog_buf_t* item) +{ + dtlog_buf_ensure(item, 1); + item->buf[item->pos] = '\0'; + return pos + sprintf(buf + pos, "[%s]", item->buf); +} + +/* -------------------------------------------------------------------------- + * TABLES + * -------------------------------------------------------------------------- + */ + +dtlog_table_t* +dtlog_table_init(dtlog_table_t* tbl, int32_t n, int32_t f, + int32_t size, dtlog_set_t* gv) +{ + dtlog_coll_alloc(&tbl, sizeof(dtlog_table_t), DTLOG_T_TABLE, gv); + + tbl->table_index = n; + tbl->num_fields = f; + tbl->is_remove = false; + + dtlog_map_init(&tbl->index_def, + DTLOG_KEY(DTLOG_T_INT_TUPLE) | DTLOG_VALUE(DTLOG_T_INT32), 0, gv); + dtlog_array_init(&tbl->index_map, DTLOG_KEY(DTLOG_T_INDEX), 0, gv); + dtlog_set_init(&tbl->tuples, DTLOG_KEY(DTLOG_T_TUPLE), size, gv); + + tbl->tuples.aux = tbl; + return tbl; +} + +void +dtlog_table_free(dtlog_table_t* tbl) +{ + dtlog_map_free(&tbl->index_def); + dtlog_array_free(&tbl->index_map); + dtlog_set_free(&tbl->tuples); + dtlog_coll_free_ptr(tbl); +} + +static void +dtlog_index_add_node0(dtlog_tuple_t* t, int32_t i_idx) +{ + /* Add first tuple for the key. */ + dtlog_index_i_pre(t, i_idx) = dtlog_index_i_suc(t, i_idx) = t; +} + +static void +dtlog_index_add_node1(dtlog_tuple_t* t, dtlog_tuple_t* head, int32_t i_idx) +{ + /* Add success tuple. */ + dtlog_index_i_suc(t, i_idx) = head; + dtlog_index_i_pre(t, i_idx) = dtlog_index_i_pre(head, i_idx); + dtlog_index_i_suc(dtlog_index_i_pre(head, i_idx), i_idx) = t; + dtlog_index_i_pre(head, i_idx) = t; +} + +static bool +dtlog_index_del_node(dtlog_tuple_t* t, int32_t i_idx) +{ + /* Returns true if this is the last node in link. */ + if (dtlog_index_i_pre(t, i_idx) == t) return true; + + dtlog_index_i_suc(dtlog_index_i_pre(t, i_idx), i_idx) = + dtlog_index_i_suc(t, i_idx); + + dtlog_index_i_pre(dtlog_index_i_suc(t, i_idx), i_idx) = + dtlog_index_i_pre(t, i_idx); + return false; +} + +static void +dtlog_index_add_tuple(dtlog_hash_t* index, int32_t i, dtlog_tuple_t* t) +{ + dtlog_map_node_t* node = dtlog_hash_get(index, t); + if (node == NULL) { + dtlog_index_add_node0(t, i); + dtlog_hash_add(index, t, NULL); + } + else { + dtlog_index_add_node1(t, (dtlog_tuple_t*)node->key, i); + node->key = t; + } +} + +dtlog_tuple_t* +dtlog_index_get_index(dtlog_table_t* tbl, dtlog_tuple_t* t, int32_t i_idx) +{ + /* 't' is key tuple. */ + dtlog_hash_t* index = dtlog_array_get(&tbl->index_map, i_idx); + dtlog_map_node_t* node = dtlog_hash_get(index, t); + + if (node == NULL) return NULL; + return (dtlog_tuple_t*) dtlog_hash_get(index, t)->key; +} + +dtlog_tuple_t* +dtlog_index_get_index_tuple(dtlog_tuple_t* t, dtlog_ints_t* def) +{ + int32_t i; + dtlog_tuple_t* newt = dtlog_tuple_init(def->n_items); + + for (i = 0;i < def->n_items;i++) { + dtlog_value_t* value = t->values[def->values[i]]; + newt->values[i] = value; + dtlog_value_ref(value); + } + + newt->count = 0; /* Compact form. */ + dtlog_tuple_set_hash_code(newt, def->n_items); + return newt; +} + +static void +dtlog_table_add0(dtlog_table_t* tbl, dtlog_tuple_t* t) +{ + /* Assume tuple is not present in table. Need not free 't' afterwards. */ + + dtlog_hash_add(&tbl->tuples, t, NULL); + int n_idx = dtlog_array_size(&tbl->index_def); + if (n_idx > 0) t->indexes = calloc(n_idx * 2, sizeof(void*)); + + /* Update indexes. */ + int32_t i; + for (i = 0;i < n_idx;i++) { + dtlog_hash_t* index = dtlog_array_get(&tbl->index_map, i); + dtlog_index_add_tuple(index, i, t); + } +} + +static void +dtlog_table_remove0(dtlog_table_t* tbl, dtlog_tuple_t* t) +{ + /* Assume tuple is present in table. 't' has been freed afterwards. */ + int32_t i; + + /* Update index. */ + int n_idx = dtlog_array_size(&tbl->index_def); + + for (i = 0;i < n_idx;i++) { + bool last = dtlog_index_del_node(t, i); + dtlog_hash_t* index = dtlog_array_get(&tbl->index_map, i); + dtlog_map_node_t* node = dtlog_hash_get(index, t); + + if (last) dtlog_hash_del(index, t); + else if (node->key == t) node->key = dtlog_index_i_suc(t, i); + } + + dtlog_hash_del(&tbl->tuples, t); +} + +static void +dtlog_table_add_extra(dtlog_table_t* tbl, dtlog_tuple_t* t) +{ + /* Add or merge count. Will be referred (add) or freed (merge count). */ + dtlog_tuple_t* et = dtlog_set_get(&tbl->tuples, t); + if (et == NULL) { + dtlog_table_add0(tbl, t); + } else { + et->count += t->count; + dtlog_tuple_free(t, tbl->m.glb_values, true); + } +} + +void +dtlog_table_add(dtlog_table_t* tbl, dtlog_tuple_t* t) +{ + /* Add with validation. 't' should not be in table. */ + dtlog_assert(!dtlog_set_has(&tbl->tuples, t) && + tbl->num_fields == t->n_values); + + dtlog_table_add0(tbl, t); +} + +void +dtlog_table_remove(dtlog_table_t* tbl, dtlog_tuple_t* t) +{ + /* Remove with validation. Must have this and from this table. */ + dtlog_assert(dtlog_set_get(&tbl->tuples, t) == t); + dtlog_table_remove0(tbl, t); +} + +int32_t +dtlog_table_add_index(dtlog_table_t* tbl, dtlog_ints_t* index_key) +{ + /* Add new index for table or returning existing one. + * 'index_key' will be cloned. */ + + dtlog_map_node_t* index = dtlog_hash_get(&tbl->index_def, index_key); + if (index != NULL) return dtlog_p2i(index->value); + + dtlog_ints_t* ikey = dtlog_int_tuple_clone(index_key); + int32_t index_id = dtlog_map_size(&tbl->index_def); + dtlog_map_add(&tbl->index_def, ikey, dtlog_i2p(index_id)); + + dtlog_map_t* new_i = dtlog_hash_init(NULL, DTLOG_COLL(DTLOG_T_INDEX), + DTLOG_SZ_INIT_HASH, tbl->m.glb_values); + + dtlog_array_add(&tbl->index_map, new_i); + new_i->aux = ikey; + + DTLOG_SET_ALL(&tbl->tuples, tuple, dtlog_tuple_t*) + tuple->indexes = realloc(tuple->indexes, + (index_id + 1) * 2 * sizeof(void*)); + dtlog_index_add_tuple(new_i, index_id, tuple); + DTLOG_SET_END + + return index_id; +} + +int32_t +dtlog_index_print(char* buf, int32_t pos, dtlog_table_t* t) +{ + int32_t i = 0, j = 0; + + DTLOG_ARRAY_ALL(&t->index_map, index, dtlog_hash_t*) + dtlog_ints_t* def = index->aux; + pos += sprintf(buf + pos, " index="); + pos = dtlog_int_tuple_print(buf, pos, def); + buf[pos++] = '\n'; + + DTLOG_SET_ALL(index, key, dtlog_tuple_t*) + pos += sprintf(buf + pos, " key set: "); + + DTLOG_INDEX_ALL(key, i, t1) + pos = dtlog_tuple_print(buf, pos, t1); + buf[pos++] = ' '; + + if (++j > 200) { + pos += sprintf(buf + pos, " ...\n"); + return pos; + } + DTLOG_INDEX_END + + buf[pos++] = '\n'; + DTLOG_SET_END + i++; + DTLOG_ARRAY_END + + return pos; +} + +int32_t +dtlog_table_print(char* buf, int32_t pos, dtlog_table_t* t, bool verbose) +{ + if (verbose) { + pos += sprintf(buf + pos, " tbl id=%d fd=%d sz=%d\n", + t->table_index, t->num_fields, dtlog_table_size(t)); + } + + pos = dtlog_hash_print(buf, pos, &t->tuples, verbose); + if (verbose) pos = dtlog_index_print(buf, pos, t); + return pos; +} + +int32_t +dtlog_rule_set_print(char* buf, int32_t pos, dtlog_rule_set_t* rs) +{ + pos += sprintf(buf + pos, "rule_name_map\n"); + pos = dtlog_hash_print(buf, pos, &rs->rule_name_map, false); + + pos += sprintf(buf + pos, "\nrule_index_map\n"); + pos = dtlog_hash_print(buf, pos, &rs->rule_index_map, false); + + pos += sprintf(buf + pos, "\ntable_rule_map\n"); + pos = dtlog_hash_print(buf, pos, &rs->table_rule_map, false); + + pos += sprintf(buf + pos, "\nparam_size\n"); + pos = dtlog_hash_print(buf, pos, &rs->param_size, false); + + pos += sprintf(buf + pos, "\ninput_tables\n"); + pos = dtlog_hash_print(buf, pos, &rs->input_tables, false); + + pos += sprintf(buf + pos, "\noutput_tables\n"); + pos = dtlog_hash_print(buf, pos, &rs->output_tables, false); + + pos += sprintf(buf + pos, "\nrules\n"); + pos = dtlog_hash_print(buf, pos, &rs->rules, false); + return pos; +} + +static int32_t +dtlog_rule_print(char* buf, int32_t pos, dtlog_rule_t* r) +{ + pos += sprintf(buf + pos, r->is_union ? "(union," : "(join,"); + pos += sprintf(buf + pos, "rule="); + pos = dtlog_array_print(buf, pos, &r->rule, false); + pos += sprintf(buf + pos, ",param="); + pos = dtlog_array_print(buf, pos, &r->param, false); + pos += sprintf(buf + pos, ",name="); + pos = dtlog_hash_print(buf, pos, &r->param_name_map, false); + pos += sprintf(buf + pos, ",const="); + pos = dtlog_hash_print(buf, pos, &r->const_param, false); + pos += sprintf(buf + pos, ")\n"); + return pos; +} + +int32_t +dtlog_coll_print(char* buf, int pos, void* item, int32_t type, bool verbose) +{ + /* do not check the buf limit */ + if (item == NULL && type != DTLOG_T_INT32 && type != DTLOG_T_TST_INT32) + return pos; + + switch (type) { + + case DTLOG_T_INT32: + case DTLOG_T_TST_INT32: + pos += sprintf(buf + pos, "%d", dtlog_p2i(item)); + break; + + case DTLOG_T_STR: + pos += sprintf(buf + pos, "%s", (const char*)item); break; + + case DTLOG_T_VALUE: + pos = dtlog_value_print(buf, pos, item, verbose); break; + + case DTLOG_T_INT_TUPLE: + pos = dtlog_int_tuple_print(buf, pos, item); break; + + case DTLOG_T_TUPLE: + pos = dtlog_tuple_print(buf, pos, item); break; + + case DTLOG_T_ARRAY: + pos = dtlog_array_print(buf, pos, item, verbose); break; + + case DTLOG_T_MAP: + case DTLOG_T_SET: + pos = dtlog_hash_print(buf, pos, item, verbose); break; + + case DTLOG_T_TABLE: + pos = dtlog_table_print(buf, pos, item, verbose); break; + + case DTLOG_T_RULE: + pos = dtlog_rule_print(buf, pos, item); break; + + case DTLOG_T_RULE_SET: + pos = dtlog_rule_set_print(buf, pos, item); break; + + case DTLOG_T_BUF: + pos = dtlog_buf_print(buf, pos, item); break; + } + return pos; +} + +/* -------------------------------------------------------------------------- + * SYNTAX + * -------------------------------------------------------------------------- + */ + +/* Token of identifier: [a-zA-Z_][a-zA-Z0-9_]* + * Table type is determined by its case. All upper case is output table; all + * lower case is input table; others are intermediate table. + * + * ::= ( , ... ) : | > + * ( | 'value' | - , ... ) ... + * + * ':' is for join. '>' is for union. Order is important. + * Special table is not used now, which could be used to specify language + * parameters. Join is preferred with external function as there is more + * flexibility in parameters. Comments start with #. + */ + +struct dtlog_sync_s { + const char* text; /* Rules text to be parsed. */ + int32_t len; /* Length of text. */ + int32_t curpos; /* Current index of text. */ + + char curchar; /* Current char to be parsed. */ + char curtoken; /* Id of current token. */ + + char token[DTLOG_SZ_LOG_TOKEN]; /* Current token in string. */ + int32_t token_pos; /* Index of token. */ + dtlog_set_t* gv; +}; + +static struct dtlog_sync_s sync; + +void +dtlog_sync_init(const char* log, dtlog_set_t* gv) +{ + sync.text = log; + sync.len = strlen(log); + sync.curpos = 0; + sync.token_pos = 0; + sync.gv = gv; +} + +static void +sync_getc(void) +{ + sync.curchar = sync.text[sync.curpos++]; +} + +static bool +sync_eof(void) +{ + return sync.curpos >= sync.len; +} + +static void +sync_error(const char* s0, const dtlog_value_t* s1) +{ + printf("syntax error: %s %s \n%s\n", s0, + s1 == NULL ? "" : s1->value.a, + sync.text + sync.curpos); + exit(1); +} + +static void +sync_init_token(void) +{ + sync.token_pos = 0; +} + +static dtlog_value_t* +sync_get_value(void) +{ + dtlog_value_t* token = dtlog_value_init( + sync.token, sync.token_pos, sync.gv); + return token; +} + +static dtlog_value_t* +sync_get_const(const char* c) +{ + dtlog_value_t* token = dtlog_value_init(c, 0, sync.gv); + return token; +} + +static void +sync_append_c(void) +{ + sync.token[sync.token_pos++] = sync.curchar; + sync_getc(); + if (sync.token_pos >= DTLOG_SZ_LOG_TOKEN) sync_error("token too long", 0); +} + +static void +sync_gett(void) +{ + bool in_comment = false; + bool in_literal = false; + bool in_ident = false; + + while (!sync_eof()) { + if (in_comment) { + if (sync.curchar == '\n') { + in_comment = false; + } + sync_append_c(); + } + else if (in_literal) { + if (sync.curchar == '\'') { + sync_getc(); + sync.curtoken = 's'; + return; + } + else { + sync_append_c(); + } + } + else if (in_ident) { + if (sync.curchar != '_' && !isalpha(sync.curchar) + && !isdigit(sync.curchar)) { + sync.curtoken = 't'; + return; + } + sync_append_c(); + } + else { + if (sync.curchar == '#') { + in_comment = true; + sync_getc(); + } + else if (sync.curchar == '\'') { + in_literal = true; + sync_init_token(); + sync_getc(); + } + else if (sync.curchar == '_' || isalpha(sync.curchar)) { + sync_init_token(); + sync_append_c(); + in_ident = true; + } + else if (strchr(":>().,-;", sync.curchar) != NULL) { + sync.curtoken = sync.curchar; + sync_getc(); + return; + } + else if (isspace(sync.curchar)) sync_getc(); + else if (sync.curchar == '\n') sync_getc(); + else sync_error("unknown char near", 0); + } + } + + /* For last period without following chars. */ + sync.curtoken = sync.curchar; +} + +static void +sync_nt_params(dtlog_ary_t* list) +{ + /* '(' ( param | 'literal' | '-' )* ',' ')' */ + if (sync.curtoken != '(') sync_error("expecting (", 0); + sync_gett(); + + for (;;) { + if (sync.curtoken == 't') { + dtlog_array_add(list, sync_get_const("t")); + dtlog_array_add(list, sync_get_value()); + } + else if (sync.curtoken == '-') { + dtlog_array_add(list, sync_get_const("-")); + dtlog_array_add(list, NULL); + } + else if (sync.curtoken == 's') { + dtlog_array_add(list, sync_get_const("s")); + dtlog_array_add(list, sync_get_value()); + } + else sync_error("expecting param, literal, or -", NULL); + + sync_gett(); + if (sync.curtoken == ',') { + sync_gett(); + continue; + } + else if (sync.curtoken == ')') { + sync_gett(); + return; + } + else sync_error("expecting , or )", NULL); + } +} + +static void +sync_nt_table(dtlog_ary_t* list) +{ + if (sync.curtoken != 't') sync_error("table name expected", NULL); + dtlog_value_t* table_name = sync_get_value(); + + sync_gett(); + dtlog_array_add(list, NULL); /* Will be override later. */ + dtlog_array_add(list, table_name); + sync_nt_params(list); +} + +void +dtlog_sync_parse(dtlog_map_t* sem) +{ + /* 'sem' is formed in the following way: + * table name -> (left side table, right side table 0, ...) + * Each table contains (table name, param0, param1, ...). + */ + + dtlog_map_init(sem, + DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_VALUE(DTLOG_T_ARRAY), 0, sync.gv); + + sync_getc(); + sync_gett(); + + for (;;) { + dtlog_ary_t* tables = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_ARRAY), 0, sync.gv); + dtlog_ary_t* tbl = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_VALUE), 0, sync.gv); + + sync_nt_table(tbl); + dtlog_array_add(tables, tbl); + dtlog_value_t* table_name = dtlog_array_get(tbl, 1); + + if (sync.curtoken != ':' && sync.curtoken != '>') { + sync_error("expecting : or >", NULL); + } + + dtlog_array_set(tbl, 0, + sync_get_const(sync.curtoken == '>' ? "u" : "j")); + sync_gett(); + + for (;;) { + tbl = dtlog_array_init(NULL, + DTLOG_KEY(DTLOG_T_VALUE), 0, sync.gv); + sync_nt_table(tbl); + dtlog_array_add(tables, tbl); + if (sync.curtoken == ';' || sync.curtoken == '.') break; + } + + if (dtlog_map_get(sem, table_name) != NULL) { + sync_error("definition existed: ", table_name); + } + + dtlog_map_add(sem, table_name, tables); + if (sync.curtoken == ';') sync_gett(); + else if (sync.curtoken == '.') return; + } +} + +/* -------------------------------------------------------------------------- + * SORTS + * -------------------------------------------------------------------------- + */ + +void +dtlog_sort_array(int32_t start, dtlog_ary_t* list, + dtlog_ary_t* sem1, dtlog_ary_t* sem2) +{ + /* Must use stable sort. See sort for table size. */ + + int32_t i, j; + for (i = start;i < list->size;i++) { + int32_t index = i; + + for (j = i + 1;j < list->size;j++) { + if (dtlog_p2i(list->item[j]) < dtlog_p2i(list->item[index])) + index = j; + } + + void* newi = (void*)list->item[index]; + void* newv1 = sem1 == NULL ? NULL : (void*)sem1->item[index]; + void* newv2 = sem2 == NULL ? NULL : (void*)sem2->item[index]; + + memmove(&list->item[i + 1], &list->item[i], + (index - i) * sizeof(void*)); + + if (sem1 != NULL) + memmove(&sem1->item[i + 1], &sem1->item[i], + (index - i) * sizeof(void*)); + if (sem2 != NULL) + memmove(&sem2->item[i + 1], &sem2->item[i], + (index - i) * sizeof(void*)); + + list->item[i] = newi; + if (sem1 != NULL) sem1->item[i] = newv1; + if (sem2 != NULL) sem2->item[i] = newv2; + } +} + +int32_t +dtlog_insert_item(int val, dtlog_ary_t* list, void* obj1, + void* obj2, dtlog_ary_t* sem1, dtlog_ary_t* sem2) +{ + /* Returns the position to insert. If there is tie, the position is + * after all items of equal value. */ + + int32_t count; + for (count = 0;count < dtlog_array_size(list);count++) { + if (val < dtlog_array_get_int(list, count)) break; + } + + dtlog_array_ins(list, count, dtlog_i2p(val)); + if (sem1 != NULL) dtlog_array_ins(sem1, count, obj1); + if (sem2 != NULL) dtlog_array_ins(sem2, count, obj2); + return count; +} + +void +dtlog_topo_sort(DTLOG_T2(dtlog_map_t*, dtlog_value_t*, dtlog_set_t*) g, + DTLOG_T(dtlog_ary_t* order, dtlog_value_t*), + DTLOG_T(dtlog_set_t*, dtlog_value_t*) in_nodes, + DTLOG_T(dtlog_set_t*, dtlog_value_t*) out_nodes) +{ + /* Input g will be freed after sort. */ + + dtlog_set_t* gv = g->m.glb_values; + dtlog_array_init(order, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + dtlog_set_init(in_nodes, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + dtlog_set_init(out_nodes, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + + dtlog_set_t right_nodes, all_nodes, to_remove; + dtlog_set_init(&right_nodes, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + dtlog_set_init(&all_nodes, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + + DTLOG_MAP_ALL(g, node) + dtlog_set_add(&all_nodes, node->key); + + DTLOG_SET_ALL(node->value, node1, dtlog_value_t*) + dtlog_set_add(&all_nodes, node1); + dtlog_set_add(&right_nodes, node1); + DTLOG_SET_END + DTLOG_MAP_END + + DTLOG_SET_ALL(&all_nodes, key, dtlog_value_t*) + if (!dtlog_map_has(g, key)) dtlog_set_add(in_nodes, key); + else if (!dtlog_set_has(&right_nodes, key)) + dtlog_set_add(out_nodes, key); + DTLOG_SET_END + + while (dtlog_set_size(&all_nodes) > 0) { + dtlog_value_t* next = NULL; + + DTLOG_SET_ALL(&all_nodes, key, dtlog_value_t*) + if (!dtlog_map_has(g, key)) { + next = key; + break; + } + DTLOG_SET_END + + if (next == NULL) + sync_error("circular graph, check around ", + dtlog_hash_get_one(&all_nodes)); + + dtlog_array_add(order, next); + dtlog_set_del(&all_nodes, next); + dtlog_set_init(&to_remove, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + + DTLOG_MAP_ALL(g, node) + dtlog_set_del(node->value, next); + if (dtlog_set_size((dtlog_set_t*)node->value) == 0) + dtlog_set_add(&to_remove, node->key); + DTLOG_MAP_END + + DTLOG_SET_ALL(&to_remove, key, dtlog_value_t*) + dtlog_set_free(dtlog_map_del(g, key)); + DTLOG_SET_END + dtlog_set_free(&to_remove); + } + + dtlog_set_free(&right_nodes); + dtlog_set_free(&all_nodes); + dtlog_map_free(g); +} + +static int32_t +check_name(dtlog_value_t* t) +{ + /* Check if string contains all lower case chars (> 0), all upper + * case (< 0) or mixed (== 0). */ + + const char* s = t->value.a; + bool lower = false; + bool upper = false; + char c; + + while ((c = *s++) != 0) { + if (islower(c)) lower = true; + else if (isupper(c)) upper = true; + } + + if (lower && upper) return 0; + else if (lower) return 1; + else if (upper) return -1; + else return 0; +} + +/* -------------------------------------------------------------------------- + * SEMANTICS + * -------------------------------------------------------------------------- + */ + +void +dtlog_sem_process(dtlog_rule_set_t* rule_set, + DTLOG_T2(dtlog_map_t*, dtlog_value_t*, dtlog_ary_t*) sem) +{ + /* The array in 'sem' is value array. */ + dtlog_set_t* gv = sem->m.glb_values; + + /* STEP 1: Check table dependency and assign digit table / rule index. */ + dtlog_map_t rules; + dtlog_map_init(&rules, + DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_VALUE(DTLOG_T_SET), 0, gv); + + dtlog_value_t* rname; + DTLOG_MAP_ALL(sem, node) + rname = NULL; + dtlog_set_t* dep = dtlog_set_init( + NULL, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + + DTLOG_ARRAY_ALL((dtlog_ary_t*)node->value, t, dtlog_ary_t*) + dtlog_value_t* nm = dtlog_array_get(t, 1); + if (rname == NULL) rname = nm; + else dtlog_set_add(dep, nm); + DTLOG_ARRAY_END + + dtlog_map_add(&rules, rname, dep); + DTLOG_MAP_END + + dtlog_ary_t topo_order; + dtlog_set_t topo_in, topo_out; + dtlog_topo_sort(&rules, &topo_order, &topo_in, &topo_out); + + int32_t rule_index = 0; + DTLOG_ARRAY_ALL(&topo_order, name, dtlog_value_t*) + dtlog_map_add(&rule_set->rule_name_map, dtlog_i2p(rule_index), name); + dtlog_map_add(&rule_set->rule_index_map, name, dtlog_i2p(rule_index++)); + DTLOG_ARRAY_END + + /* STEP 2: Check upper case, lower case and mixed. */ + DTLOG_SET_ALL(&topo_in, name, dtlog_value_t*) + if (check_name(name) <= 0) + sync_error("input must be all lower case: ", name); + + dtlog_set_add(&rule_set->input_tables, + dtlog_map_get(&rule_set->rule_index_map, name)); + DTLOG_SET_END + + DTLOG_SET_ALL(&topo_out, name, dtlog_value_t*) + if (check_name(name) >= 0) + sync_error("output must be all upper case: ", name); + + dtlog_set_add(&rule_set->output_tables, + dtlog_map_get(&rule_set->rule_index_map, name)); + DTLOG_SET_END + + dtlog_set_t intr; + dtlog_set_init(&intr, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + + DTLOG_MAP_ALL(sem, name) + if (dtlog_map_has(&topo_out, dtlog_i2p(name->key))) continue; + if (check_name(dtlog_i2p(name->key)) != 0) + sync_error("intermediate must be mixed: ", name->key); + DTLOG_MAP_END + + /* STEP 3: Check table size consistency and assign digit index. + * If union / join performs on the same table, it could only perform + * twice for each table, e.g., 'R : A, A, A' is not supported. + */ + dtlog_map_t table_size; + dtlog_map_init(&table_size, + DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_VALUE(DTLOG_T_INT32), 0, gv); + + DTLOG_MAP_ALL(sem, name) + dtlog_ary_t* val = name->value; /* Array of array of value. */ + dtlog_rule_t* rule = dtlog_rule_init(NULL, gv); + + /* Rule_id is integer. */ + void* rule_id = dtlog_map_get(&rule_set->rule_index_map, name->key); + dtlog_map_add(&rule_set->rules, rule_id, rule); + + dtlog_value_t* rule_t = dtlog_array_get(dtlog_array_get(val, 0), 0); + rule->is_union = strcmp("u", rule_t->value.a) == 0; + + int const_value = -2; /* -1 is for 'ignore'. */ + + /* Reorder table sequence. */ + DTLOG_ARRAY_ALL(val, t, dtlog_ary_t*) + dtlog_value_t* table_name = dtlog_array_get(t, 1); + + void* rule_id = dtlog_map_get( + &rule_set->rule_index_map, table_name); + dtlog_array_add(&rule->rule, rule_id); + DTLOG_ARRAY_END + + int32_t i; + dtlog_sort_array(1, &rule->rule, val, NULL); + + for (i = 2;i < rule->rule.size - 1;i++) { + if (dtlog_array_get(&rule->rule, i - 1) == + dtlog_array_get(&rule->rule, i) && + dtlog_array_get(&rule->rule, i + 1) == + dtlog_array_get(&rule->rule, i)) + + sync_error( + "cannot join / union on itself for more than twice: ", + name->key); + } + + int32_t param_size = 0; + bool is_left = true; + bool left_has_param = false; + bool left_has_value = false; + + dtlog_map_t param_map; + dtlog_map_init(¶m_map, + DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_VALUE(DTLOG_T_INT32), 0, gv); + + DTLOG_ARRAY_ALL(val, t, dtlog_ary_t*) + dtlog_ary_t* param_list = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + dtlog_array_add(&rule->param, param_list); + dtlog_value_t* table_name = dtlog_array_get(t, 1); + + int32_t size = ((dtlog_ary_t*)t)->size / 2 - 1; + int32_t table_id = dtlog_map_get_int( + &rule_set->rule_index_map, table_name); + dtlog_map_add(&rule_set->param_size, + dtlog_i2p(table_id), dtlog_i2p(size)); + + /* Check table size. */ + if (rule->is_union) { + if (is_left) param_size = size; + else if (size != param_size) { + sync_error("table param size mismatch in union rule: ", + name->key); + } + } else { + struct dtlog_map_node_s* ksize = + dtlog_hash_get(&table_size, table_name); + + if (ksize == NULL) + dtlog_map_add(&table_size, table_name, dtlog_i2p(size)); + else if (dtlog_p2i(ksize->value) != size) + sync_error("table param size mismatch in join rule: " , + name->key); + } /* If, check table size. */ + + /* Assign index to each param. */ + for (i = 0; i < size;i++) { + dtlog_value_t* param_type = dtlog_array_get(t, i * 2 + 2); + dtlog_value_t* param_value = dtlog_array_get(t, i * 2 + 3); + + if (is_left && strcmp(param_type->value.a, "-") == 0) + sync_error("left cannot have 'ignore': ", name->key); + + if (strcmp(param_type->value.a, "-") == 0) { + dtlog_array_add(param_list, dtlog_i2p(-1)); + + } else if (strcmp(param_type->value.a, "t") == 0) { + if (is_left) left_has_param = true; + dtlog_map_node_t* param_no = + dtlog_hash_get(¶m_map, param_value); + + void* no; /* Type is integer. */ + if (param_no == NULL) { + no = dtlog_i2p(param_map.size); + dtlog_map_add(¶m_map, param_value, no); + dtlog_map_add(&rule->param_name_map, no, param_value); + } else no = param_no->value; + dtlog_array_add(param_list, no); + + } else if (strcmp(param_type->value.a, "s") == 0) { + if (is_left) left_has_value = true; + + void* c_value = param_value; + dtlog_map_add(&rule->const_param, + dtlog_i2p(const_value), c_value); + dtlog_array_add(param_list, dtlog_i2p(const_value--)); + } + } /* For each table param. */ + is_left = false; + DTLOG_ARRAY_END /* For each table. */ + + dtlog_map_free(¶m_map); + if (!left_has_param) sync_error("left must have param: ", name->key); + if (rule->is_union && left_has_value) + sync_error("left cannot have const: ", name->key); + DTLOG_MAP_END /* For each rule. */ + + /* STEP 4: Check param reference. */ + DTLOG_MAP_ALL(&rule_set->rules, rule_no) + dtlog_ary_t* left_param = NULL; /* Int array for checking union. */ + + dtlog_rule_t* rule = dtlog_map_get(&rule_set->rules, rule_no->key); + dtlog_value_t* rule_name = dtlog_map_get( + &rule_set->rule_name_map, rule_no->key); + + /* Add table rule map. */ + int32_t i; + + for (i = 1;i < rule->rule.size;i++) { + dtlog_ary_t* set = dtlog_map_get(&rule_set->table_rule_map, + dtlog_array_get(&rule->rule, i)); + + if (set == NULL) { + set = dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + dtlog_map_add(&rule_set->table_rule_map, + dtlog_array_get(&rule->rule, i), set); + } + + int32_t found = dtlog_array_look_for(set, rule_no->key); + if (found < 0) dtlog_array_add(set, rule_no->key); + } + + DTLOG_ARRAY_ALL(&rule->param, param, dtlog_ary_t*) + /* Param is array of integer. */ + if (left_param == NULL) left_param = param; + + if (rule->is_union) { + if (left_param != NULL) { + bool not_found = false; + + DTLOG_ARRAY_ALL(left_param, item, void*) + if (dtlog_array_look_for(param, item) < 0) { + not_found = true; + break; + } + DTLOG_ARRAY_END + + if (not_found) + sync_error("union param not found in ", rule_name); + } + continue; + } + + /* For right side param, it must also appear either in left + * side or right side or being 'ignored'; for left side param, + * it must appear in right side. + */ + DTLOG_ARRAY_ALL((dtlog_ary_t*)param, p0no, void*) + int32_t p0 = dtlog_p2i(p0no); + if (p0 < 0) continue; + + bool found = false; + DTLOG_ARRAY_ALL(&rule->param, param1, dtlog_ary_t*) + if (param == param1) continue; + + if (dtlog_array_look_for(param1, p0no) >= 0) { + found = true; + break; + } + DTLOG_ARRAY_END + + if (!found) + sync_error("not used / undefined param ", rule_name); + DTLOG_ARRAY_END + DTLOG_ARRAY_END /* For each table. */ + DTLOG_MAP_END /* For each rule. */ + + /* STEP 5: Sort table_rule_map. */ + DTLOG_MAP_ALL(&rule_set->table_rule_map, node) + dtlog_ary_t* list = node->value; + dtlog_sort_array(0, list, NULL, NULL); + DTLOG_MAP_END + + dtlog_set_free(&intr); + dtlog_set_free(&topo_in); + dtlog_set_free(&topo_out); + dtlog_map_free(&table_size); + dtlog_array_free(&topo_order); +} + +/* -------------------------------------------------------------------------- + * TABLE OPERATION + * -------------------------------------------------------------------------- + */ + +static dtlog_tuple_t* +tblopr_reorder_tuple(dtlog_tuple_t* t, DTLOG_T(dtlog_ary_t*, int32_t) order, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_value_t*) const_map) +{ + /* Input and output table could be different. Order is represented by + * sequence number instead of param index. + * (a, b, c, d, e) + (2, -2, 1, 4) => (c, const[-2], b, e) + */ + + int32_t i; + dtlog_tuple_t* newt = dtlog_tuple_init(dtlog_array_size(order)); + + for (i = 0;i < dtlog_array_size(order);i++) { + dtlog_value_t* value; + int32_t order_i = dtlog_array_get_int(order, i); + + if (order_i >= 0) value = t->values[order_i]; + else if (order_i < -1) + value = dtlog_map_get(const_map, dtlog_i2p(order_i)); + else dtlog_assert(false); /* Reorder sees constants. */ + + dtlog_value_ref(value); + newt->values[i] = value; + } + + newt->count = t->count; + dtlog_tuple_set_hash_code(newt, dtlog_array_size(order)); + return newt; +} + +static void +tblopr_reorder_table(dtlog_table_t* input, + DTLOG_T(dtlog_ary_t*, int32_t) order, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_value_t*) const_map, + dtlog_table_t* output) +{ + /* Input and output table could be different. */ + DTLOG_SET_ALL(&input->tuples, t, dtlog_tuple_t*) + dtlog_table_add(output, tblopr_reorder_tuple(t, order, const_map)); + DTLOG_SET_END +} + +static bool +tblopr_match_const(dtlog_tuple_t* t, DTLOG_T(dtlog_ary_t*, int32_t) cpos, + dtlog_value_t** cval) +{ + int32_t i; + for (i = 0;i < dtlog_array_size(cpos);i++) { + int32_t ci = dtlog_array_get_int(cpos, i); + if (t->values[ci] != cval[i]) return false; + } + return true; +} + +static dtlog_table_t* +tblopr_query_table(dtlog_table_t* input, DTLOG_T(dtlog_ary_t*, int32_t) param, + dtlog_value_t** val) +{ + /* Input only used to get table param. */ + + dtlog_table_t* output = + dtlog_table_init(NULL, input->table_index, input->num_fields, + 0, input->m.glb_values); + + if (dtlog_array_size(param) == 0) { + DTLOG_SET_ALL(&input->tuples, t, dtlog_tuple_t*) + dtlog_tuple_t* nt = dtlog_tuple_clone(t); + dtlog_table_add(output, nt); + DTLOG_SET_END + return output; + } + + dtlog_ints_t* index = dtlog_int_tuple_init(param); + int32_t idx = dtlog_table_add_index(input, index); + dtlog_int_tuple_free(index); + + dtlog_tuple_t* key = dtlog_tuple_init_val(val, dtlog_array_size(param)); + dtlog_tuple_t* set = dtlog_index_get_index(input, key, idx); + + /* Check set is null? */ + DTLOG_INDEX_ALL(set, idx, t) + dtlog_tuple_t* nt = dtlog_tuple_clone(t); + dtlog_table_add(output, nt); + DTLOG_INDEX_END + + dtlog_tuple_free(key, input->m.glb_values, false); + return output; +} + +static void +tblopr_merge_tuple(dtlog_table_t* tbl, dtlog_tuple_t* tup, + bool negative, bool free_t) +{ + /* Tuple 'tup' untouched after invocation if free_t is false. */ + dtlog_tuple_t* org_tuple = dtlog_set_get(&tbl->tuples, tup); + + if (org_tuple == NULL) { + dtlog_tuple_t* nt = free_t ? tup : dtlog_tuple_clone(tup); + dtlog_table_add0(tbl, nt); + } + else { + if (negative) org_tuple->count -= tup->count; + else org_tuple->count += tup->count; + if (free_t) dtlog_tuple_free(tup, tbl->m.glb_values, true); + } +} + +static void +tblopr_merge_table(dtlog_table_t* src, dtlog_table_t* dst, bool negative) +{ + DTLOG_SET_ALL(&src->tuples, t, dtlog_tuple_t*) + tblopr_merge_tuple(dst, t, negative, false); + DTLOG_SET_END +} + +static void +tblopr_final_delta(dtlog_table_t* source, dtlog_table_t* dest) +{ + if (source->is_remove) { + DTLOG_SET_ALL(&source->tuples, t, dtlog_tuple_t*) + dtlog_tuple_t* ot = dtlog_set_get(&dest->tuples, t); + dtlog_table_remove0(dest, ot); + DTLOG_SET_END + } else { + DTLOG_SET_ALL(&source->tuples, t, dtlog_tuple_t*) + dtlog_assert(dtlog_set_get(&dest->tuples, t) == NULL); + dtlog_tuple_t* nt = dtlog_tuple_clone(t); + dtlog_table_add0(dest, nt); + DTLOG_SET_END + } +} + +static void +tblopr_match_reorder_and_merge(dtlog_table_t* input, dtlog_table_t* output, + DTLOG_T(dtlog_ary_t*, int32_t) parami, + DTLOG_T(dtlog_ary_t*, int32_t) paramo, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_value_t*) const_map) +{ + /* Example: output(0, 1, 2) from input(2, 1, -, 'const', 0). + * Input count is ignored. parami/o is param number, not sequence number. + */ + + dtlog_set_t* gv = input->m.glb_values; + DTLOG_T(dtlog_ary_t*, int32_t) cpos = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + DTLOG_T(dtlog_ary_t*, int32_t) order = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + int32_t pos = 0, i; + + for (i = 0;i < dtlog_array_size(parami);i++) { + int32_t c = dtlog_array_get_int(parami, i); + + if (c < -1) dtlog_array_add(cpos, dtlog_i2p(i)); + else if (c == -1) continue; + else { + int32_t op = dtlog_array_look_for( + parami, dtlog_array_get(paramo, pos++)); + if (op >= 0) dtlog_array_add(order, dtlog_i2p(op)); + } + } + + dtlog_value_t** cval = calloc(dtlog_array_size(cpos), sizeof(void*)); + for (i = 0;i < dtlog_array_size(cpos);i++) { + cval[i] = (dtlog_value_t*) + dtlog_map_get(const_map, dtlog_array_get(parami, + dtlog_array_get_int(cpos, i))); + } + + DTLOG_SET_ALL(&input->tuples, t, dtlog_tuple_t*) + if (tblopr_match_const(t, cpos, cval)) { + dtlog_tuple_t* nt = tblopr_reorder_tuple(t, order, NULL); + dtlog_table_add_extra(output, nt); + } + DTLOG_SET_END + + free(cval); + dtlog_array_free(cpos); + dtlog_array_free(order); +} + +static void +tblopr_combine_tuple(dtlog_table_t* res, int32_t tuple1_count, + dtlog_join_param_t* joinp, int r1_fnum, + dtlog_tuple_t* tuple2, dtlog_value_t** tuple_values) +{ + int32_t i; + for (i = 0;i < dtlog_array_size(&joinp->rem2);i++) { + tuple_values[i + r1_fnum] = tuple2->values[ + dtlog_p2i(dtlog_array_get(&joinp->rem2, i))]; + } + + int32_t res_num_fields = res->num_fields; + dtlog_tuple_t* nt = dtlog_tuple_init_val(tuple_values, res->num_fields); + + nt->count = tuple1_count; + for (i = 0;i < res_num_fields;i++) dtlog_value_ref(nt->values[i]); + tblopr_merge_tuple(res, nt, false, true); +} + +static dtlog_table_t* +tblopr_cond_join(dtlog_table_t* t1, dtlog_table_t* t2, + dtlog_join_param_t* joinp) +{ + /* 't1' is intermediate table during join; 't2' is original table. + * Select values joinp.select1 from t1 and const, and match that with + * params indicated by joinp.index2. + */ + dtlog_set_t* gv = t1->m.glb_values; + + int32_t r1_fnum = dtlog_array_size(&joinp->rem1); + int32_t r2_fnum = dtlog_array_size(&joinp->rem2); + + dtlog_table_t* res = dtlog_table_init(NULL, -1, r1_fnum + r2_fnum, 0, gv); + int32_t t2_index = dtlog_map_get_int(&t2->index_def, joinp->index2); + int32_t t1val_sz = dtlog_array_size(&joinp->select1); + + DTLOG_T(dtlog_ary_t*, int32_t) t1param = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_INT32), t1val_sz, gv); + dtlog_tuple_t* key_tuple = dtlog_tuple_init_val(NULL, t1val_sz); + + int32_t i; + for (i = 0;i < t1val_sz;i++) { + dtlog_value_t* obj = dtlog_array_get(&joinp->select1, i); + if (obj != NULL) { + dtlog_array_add(t1param, dtlog_i2p(-1)); /* Mark as not set. */ + key_tuple->values[i] = obj; + } else { + dtlog_array_add(t1param, dtlog_array_get(&joinp->select1i, i)); + } + } + + /* Loop over t1 and join. */ + dtlog_value_t** tuple_values = calloc(sizeof(void*), res->num_fields); + DTLOG_SET_ALL(&t1->tuples, tuple1, dtlog_tuple_t*) + + for (i = 0;i < dtlog_array_size(t1param);i++) { + int32_t t1p = dtlog_p2i(dtlog_array_get(t1param, i)); + if (t1p < 0) continue; + key_tuple->values[i] = tuple1->values[t1p]; + } + + dtlog_tuple_set_hash_code(key_tuple, t1val_sz); + dtlog_tuple_t* match_tuples = + dtlog_index_get_index(t2, key_tuple, t2_index); + + if (match_tuples != NULL) { + for (i = 0;i < r1_fnum;i++) { + tuple_values[i] = + tuple1->values[dtlog_p2i(dtlog_array_get(&joinp->rem1, i))]; + } + + /* Join the value. */ + DTLOG_INDEX_ALL(match_tuples, t2_index, tuple2) + tblopr_combine_tuple(res, tuple1->count, + joinp, r1_fnum, tuple2, tuple_values); + DTLOG_INDEX_END + } /* If, having match tuples. */ + DTLOG_SET_END + + dtlog_tuple_free(key_tuple, t1->m.glb_values, false); + dtlog_array_free(t1param); + free(tuple_values); + return res; +} + +static void +tblopr_gen_delta(dtlog_table_t* source, dtlog_table_t* dest) +{ + bool is_remove = source->is_remove; + dtlog_set_t* gv = dest->m.glb_values; + + DTLOG_SET_ALL(&source->tuples, st, dtlog_tuple_t*) + dtlog_assert(st->indexes == NULL); + + int64_t st_count = st->count; + dtlog_tuple_t* dt = dtlog_set_get(&dest->tuples, st); + + if (is_remove) { + dtlog_assert(dt != NULL && dt->count >= st_count); + if (dt->count > st_count) { + dt->count -= st_count; + DTLOG_SET_DEL_ITEM; + dtlog_tuple_free(st, gv, true); + } + } else { + if (dt != NULL) { + dt->count += st_count; + DTLOG_SET_DEL_ITEM; + dtlog_tuple_free(st, gv, true); + } + } + DTLOG_SET_END +} + +static void +tblopr_merge_delta(dtlog_table_t* source, dtlog_table_t* dest, + dtlog_table_t* dest_ivt) +{ + + /* 'dest' is of the same operation as 'source', while dest_ivt + * is opposite. */ + + dtlog_assert(source->table_index == dest->table_index); + dtlog_set_t* gv = dest->m.glb_values; + + DTLOG_SET_ALL(&source->tuples, st, dtlog_tuple_t*) + dtlog_assert(st->indexes == NULL); + dtlog_tuple_t* dt = dtlog_set_get(&dest->tuples, st); + + if (dt != NULL) { + DTLOG_SET_DEL_ITEM; + dt->count += st->count; /* Count will not change hash. */ + dtlog_tuple_free(st, gv, true); + /* Will not match opposite. */ + } else { + + dtlog_tuple_t* dt_ivt = dtlog_set_get(&dest_ivt->tuples, st); + /* will be added later for dt_ivt == NULL. */ + if (dt_ivt != NULL) { + /* Cross merge. */ + long st_count = st->count; + dt_ivt->count -= st_count; + + if (dt_ivt->count >= 0) { + DTLOG_SET_DEL_ITEM; + dtlog_tuple_free(st, gv, true); + } + + if (dt_ivt->count == 0) dtlog_table_remove0(dest_ivt, dt_ivt); + else if (dt_ivt->count < 0) { /* Move to opposite table. */ + dt_ivt->count = -dt_ivt->count; + dtlog_tuple_t* nt = dtlog_tuple_clone(dt_ivt); + dtlog_table_add(dest, nt); + dtlog_table_remove0(dest_ivt, dt_ivt); + } + } + } + DTLOG_SET_END /* For source tuple. */ + + /* Add remaining. */ + DTLOG_SET_ALL(&source->tuples, t, dtlog_tuple_t*) + dtlog_tuple_t* nt = dtlog_tuple_clone(t); + dtlog_table_add0(dest, nt); + DTLOG_SET_END +} + +static dtlog_table_t* +tblopr_full_join(dtlog_table_t* t1, dtlog_table_t* t2, + dtlog_join_param_t* joinp) +{ + /* 'select1' contains only constants. */ + + int32_t r1_fnum = dtlog_array_size(&joinp->rem1); + dtlog_table_t* res = dtlog_table_init( + NULL, -1, r1_fnum + dtlog_array_size(&joinp->rem2), + 0, t1->m.glb_values); + + int32_t i; + dtlog_value_t** tuple_values = calloc(sizeof(void*), res->num_fields); + + if (joinp->index2 == NULL) { + DTLOG_SET_ALL(&t1->tuples, tuple1, dtlog_tuple_t*) + for (i = 0;i < r1_fnum;i++) { + tuple_values[i] = tuple1->values[ + dtlog_p2i(dtlog_array_get(&joinp->rem1, i))]; + } + + DTLOG_SET_ALL(&t2->tuples, tuple2, dtlog_tuple_t*) + tblopr_combine_tuple(res, tuple1->count, + joinp, r1_fnum, tuple2, tuple_values); + DTLOG_SET_END + DTLOG_SET_END + + } else { + int32_t t2_index = dtlog_map_get_int(&t2->index_def, joinp->index2); + int t1val_sz = dtlog_array_size(&joinp->select1); + dtlog_value_t** t1val = calloc(sizeof(void*), t1val_sz); + + for (i = 0;i < t1val_sz;i++) { + t1val[i] = dtlog_array_get(&joinp->select1, i); + } + + dtlog_tuple_t* key_tuple = dtlog_tuple_init_val(t1val, t1val_sz); + dtlog_tuple_t* match_tuples = dtlog_index_get_index( + t2, key_tuple, t2_index); + + free(t1val); + dtlog_tuple_free(key_tuple, t1->m.glb_values, false); + + DTLOG_SET_ALL(&t1->tuples, tuple1, dtlog_tuple_t*) + for (i = 0;i < r1_fnum;i++) { + tuple_values[i] = tuple1->values[ + dtlog_p2i(dtlog_array_get(&joinp->rem1, i))]; + } + + /* Join the value. */ + DTLOG_INDEX_ALL(match_tuples, t2_index, tuple2) + tblopr_combine_tuple(res, tuple1->count, + joinp, r1_fnum, tuple2, tuple_values); + DTLOG_INDEX_END + DTLOG_SET_END + } + + free(tuple_values); + return res; /* Check is empty? */ +} + +dtlog_table_t* +dtlog_tblopr_join(dtlog_table_t* t1, dtlog_table_t* t2, + dtlog_join_param_t* joinp) +{ + if (joinp->index2 != NULL && + !dtlog_map_has(&t2->index_def, joinp->index2)) { + dtlog_table_add_index(t2, joinp->index2); + } + + if (joinp->full_join) return tblopr_full_join(t1, t2, joinp); + else return tblopr_cond_join(t1, t2, joinp); +} + +/* -------------------------------------------------------------------------- + * DATALOG ENGINE + * -------------------------------------------------------------------------- + */ + +dtlog_join_param_t* +dtlog_join_param_init(dtlog_join_param_t* jp, + dtlog_ints_t* i2, dtlog_set_t* gv) +{ + dtlog_coll_alloc(&jp, sizeof(dtlog_join_param_t), DTLOG_T_JOIN_PARAM, gv); + + jp->full_join = false; + jp->index2 = i2; + dtlog_array_init(&jp->select1, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + dtlog_array_init(&jp->select1i, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + dtlog_array_init(&jp->rem1, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + dtlog_array_init(&jp->rem2, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + dtlog_array_init(&jp->out_param, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + return jp; +} + +void +dtlog_join_param_free(dtlog_join_param_t* jp) +{ + dtlog_array_free(&jp->select1); + dtlog_array_free(&jp->select1i); + dtlog_array_free(&jp->rem1); + dtlog_array_free(&jp->rem2); + dtlog_array_free(&jp->out_param); + if (jp->index2 != NULL) dtlog_int_tuple_free(jp->index2); + dtlog_coll_free_ptr(jp); +} + +static dtlog_ary_t* +eng_get_cond_param(dtlog_ary_t* param) +{ + /* Input and output are array of integers. */ + + int32_t i; + dtlog_ary_t* res = dtlog_array_init(NULL, + DTLOG_KEY(DTLOG_T_INT32), 0, param->m.glb_values); + + for (i = 0;i < dtlog_array_size(param);i++) { + int32_t p = dtlog_array_get_int(param, i); + if (p >= 0) dtlog_array_add(res, dtlog_i2p(p)); + } + return res; +} + +static bool +eng_check_will_use(int32_t p, DTLOG_T(dtlog_ary_t*, int32_t) not_used, + DTLOG_T(dtlog_ary_t*, int32_t) reorder_list, + dtlog_rule_t* rule) +{ + int32_t i; + for (i = 0;i < dtlog_array_size(not_used);i++) { + if (dtlog_p2i(dtlog_array_get(not_used, i)) < 0) continue; + + dtlog_ary_t* a = (dtlog_ary_t*)dtlog_array_get(&rule->param, + dtlog_array_get_int(reorder_list, i)); + if (dtlog_array_look_for(a, dtlog_i2p(p)) >= 0) return true; + } + return false; +} + +static dtlog_bits_t* +eng_gen_bitset(DTLOG_T(dtlog_ary_t*, int32_t) inp) +{ + int32_t i; + dtlog_bits_t* b = dtlog_bitset_init(NULL); + + for (i = 0;i < dtlog_array_size(inp);i++) { + int32_t idx = dtlog_array_get_int(inp, i); + if (idx >= 0) dtlog_bitset_set(b, idx); + } + return b; +} + +static int32_t +eng_get_joinable(dtlog_rule_t* rule, DTLOG_T(dtlog_ary_t*, int32_t) cur_param, + DTLOG_T(dtlog_ary_t*, int32_t) table_sz, + DTLOG_T(dtlog_ary_t*, int32_t) reorder) +{ + /* Returns the sequence id of the table to be joined. */ + + int32_t i, tb_index; + dtlog_bits_t* bitset1 = eng_gen_bitset(cur_param); + + for (i = 1;i < dtlog_array_size(table_sz);i++) { + /* This indicates it has joined. */ + if (dtlog_p2i(dtlog_array_get(table_sz, i)) < 0) continue; + tb_index = dtlog_p2i(dtlog_array_get(reorder, i)); + + dtlog_bits_t* bitset2 = eng_gen_bitset( + dtlog_array_get(&rule->param, tb_index)); + + dtlog_bitset_and(bitset2, bitset1); + bool empty = dtlog_bitset_empty(bitset2); + dtlog_bitset_free(bitset2); + + if (!empty) { + dtlog_bitset_free(bitset1); + return i; /* Conditional join. */ + } + } + + dtlog_bitset_free(bitset1); + for (i = 1;i < dtlog_array_size(table_sz);i++) { + if (dtlog_p2i(dtlog_array_get(table_sz, i)) >= 0) return i; + /* Full join. */ + } + return -1; +} + +static dtlog_join_param_t* +eng_gen_join_param(DTLOG_T(dtlog_ary_t*, int32_t) param1, + DTLOG_T(dtlog_ary_t*, int32_t) param2, + DTLOG_T(dtlog_ary_t*, int32_t) not_used, + DTLOG_T(dtlog_ary_t*, int32_t) reorder, + dtlog_rule_t* rule) +{ + /* Examples of join parameters: when it is conditional join: + * p1(7, 3, 2) + p2(2, 3, -1, -3, 6) => out param(7, 2, 6) + * 'join' params: select1(2, 1, v[-3]), index2(0, 1, 3) + * 'keep' params: rem1(0, 2), rem2(4) + * + * When it is full join: + * p1(7, 2, 6) + p2(-4, 9, -1) => out param(7, 2, 6, 9) + * 'join' params: select1(v[-4]), index2(0) + * 'keep' params: rem1(0, 1), rem2(1) + */ + + dtlog_join_param_t* joinp = dtlog_join_param_init( + NULL, /*idx2*/NULL, rule->m.glb_values); + + DTLOG_T(dtlog_ary_t*, int32_t) index2 = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), + 0, param1->m.glb_values); + + dtlog_bits_t* join_set = eng_gen_bitset(param1); + dtlog_bits_t* param2_set = eng_gen_bitset(param2); + + int32_t i; + dtlog_bitset_and(join_set, param2_set); + joinp->full_join = true; + + /* Set select1 and index2. */ + for (i = 0;i < dtlog_array_size(param2);i++) { + int32_t p2 = dtlog_array_get_int(param2, i); + + if (p2 < -1) { + dtlog_array_add(index2, dtlog_i2p(i)); + dtlog_value_t* v = (dtlog_value_t*) + dtlog_map_get(&rule->const_param, dtlog_i2p(p2)); + + dtlog_array_add(&joinp->select1, v); + dtlog_array_add(&joinp->select1i, 0); + + } else if (p2 >= 0 && dtlog_bitset_get(join_set, p2)) { + int32_t pos1 = dtlog_array_look_for(param1, dtlog_i2p(p2)); + dtlog_array_add(index2, dtlog_i2p(i)); + + dtlog_array_add(&joinp->select1, NULL); + dtlog_array_add(&joinp->select1i, dtlog_i2p(pos1)); + joinp->full_join = false; + } + } + + /* Set rem1. */ + for (i = 0;i < dtlog_array_size(param1);i++) { + /* If in left or right, excluding processed tables, keep them. */ + int32_t p1 = dtlog_array_get_int(param1, i); + if (!eng_check_will_use(p1, not_used, reorder, rule)) continue; + dtlog_array_add(&joinp->rem1, dtlog_i2p(i)); + dtlog_array_add(&joinp->out_param, dtlog_i2p(p1)); + } + + /* Set rem2. */ + for (i = 0;i < dtlog_array_size(param2);i++) { + /* Exclude all join set (because it is included in param1). + * If in left or right, excluding processed tables, keep them. + */ + + int32_t p2 = dtlog_array_get_int(param2, i); + if (p2 <= -1) continue; + if (dtlog_bitset_get(join_set, p2)) continue; + if (!eng_check_will_use(p2, not_used, reorder, rule)) continue; + + dtlog_array_add(&joinp->rem2, dtlog_i2p(i)); + dtlog_array_add(&joinp->out_param, dtlog_i2p(p2)); + } + + int32_t sz2 = dtlog_array_size(index2); + joinp->index2 = sz2 > 0 ? dtlog_int_tuple_init(index2) : NULL; + + dtlog_array_free(index2); + dtlog_bitset_free(join_set); + dtlog_bitset_free(param2_set); + return joinp; +} + +dtlog_engine_t* +dtlog_eng_parse(const char* rules, dtlog_set_t* gv) +{ + dtlog_engine_t* eng = dtlog_engine_init(NULL, gv); + DTLOG_T(dtlog_map_t, dtlog_ary_t*) sem; + + dtlog_sync_init(rules, gv); + dtlog_sync_parse(&sem); + dtlog_sem_process(&eng->rule_set, &sem); + dtlog_map_free(&sem); + + /* Create tables. */ + DTLOG_MAP_ALL(&eng->rule_set.param_size, rule) + int32_t tsize = dtlog_p2i(rule->value); + if (dtlog_map_has(&eng->rule_set.input_tables, rule->key) || + !dtlog_map_has(&eng->rule_set.output_tables, rule->key)) { + + dtlog_map_add(&eng->tables, rule->key, + dtlog_table_init(NULL, dtlog_p2i(rule->key), tsize, 0, gv)); + } + DTLOG_MAP_END + return eng; +} + +static dtlog_table_t* +eng_create_table(dtlog_engine_t* eng, int32_t tbl_idx, bool is_remove) +{ + dtlog_set_t* gv = eng->m.glb_values; + int32_t tbl_fd = dtlog_map_get_int(&eng->rule_set.param_size, + dtlog_i2p(tbl_idx)); + dtlog_table_t* t = dtlog_table_init(NULL, tbl_idx, tbl_fd, 0, gv); + t->is_remove = is_remove; + return t; +} + +static void +eng_put_tuple(dtlog_engine_t* eng, dtlog_tuple_t* tuple, + bool is_remove, int32_t tbl_idx, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) remove, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) insert) +{ + dtlog_table_t* tbl = dtlog_map_get( + is_remove ? remove : insert, dtlog_i2p(tbl_idx)); + if (!tbl) { + tbl = eng_create_table(eng, tbl_idx, is_remove); + dtlog_map_add(is_remove ? remove : insert, dtlog_i2p(tbl_idx), tbl); + } + dtlog_table_add(tbl, tuple); +} + +static void +eng_check_put_tuple(dtlog_engine_t* eng, dtlog_tuple_t* tuple, + bool is_remove, int32_t tbl_idx, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) remove, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) insert) +{ + dtlog_set_t* gv = eng->m.glb_values; + if (is_remove) { + dtlog_table_t* tbl1 = dtlog_map_get(insert, dtlog_i2p(tbl_idx)); + dtlog_tuple_t* tuple1 = tbl1 == NULL ? NULL : + dtlog_set_get(&tbl1->tuples, tuple); + + if (tuple1) { /* In opposite delta table. */ + dtlog_table_remove(tbl1, tuple1); + dtlog_tuple_free(tuple, gv, true); + return; + } + + tbl1 = dtlog_map_get(&eng->tables, dtlog_i2p(tbl_idx)); + tuple1 = dtlog_set_get(&tbl1->tuples, tuple); + + if (!tuple1) { + dtlog_tuple_free(tuple, gv, true); + return; + } + } else { + dtlog_table_t* tbl1 = dtlog_map_get(remove, dtlog_i2p(tbl_idx)); + dtlog_tuple_t* tuple1 = tbl1 == NULL ? NULL : + dtlog_set_get(&tbl1->tuples, tuple); + + if (tuple1) { /* In opposite delta table. */ + dtlog_table_remove(tbl1, tuple1); + dtlog_tuple_free(tuple, gv, true); + return; + } + + tbl1 = dtlog_map_get(&eng->tables, dtlog_i2p(tbl_idx)); + tuple1 = dtlog_set_get(&tbl1->tuples, tuple); + + if (tuple1) { + dtlog_tuple_free(tuple, gv, true); + return; + } + } + eng_put_tuple(eng, tuple, is_remove, tbl_idx, remove, insert); +} + +static void +eng_align_tables(dtlog_engine_t* eng, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) inp_remove, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) inp_insert) +{ + dtlog_ary_t* del_ids = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_INT32), 0, eng->m.glb_values); + dtlog_ary_t* add_ids = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_INT32), 0, eng->m.glb_values); + + DTLOG_ARRAY_ALL(inp_remove, tbl, dtlog_table_t*) + dtlog_array_add(del_ids, dtlog_i2p(tbl->table_index)); + DTLOG_ARRAY_END + + DTLOG_ARRAY_ALL(inp_insert, tbl, dtlog_table_t*) + dtlog_array_add(add_ids, dtlog_i2p(tbl->table_index)); + DTLOG_ARRAY_END + + dtlog_sort_array(0, del_ids, inp_remove, NULL); + dtlog_sort_array(0, add_ids, inp_insert, NULL); + + int del_i, add_i; + for (del_i = 0, add_i = 0; + del_i < dtlog_array_size(del_ids) || + add_i < dtlog_array_size(add_ids);) { + + bool align_add = false; + bool align_del = false; + + if (del_i < dtlog_array_size(del_ids) && + add_i < dtlog_array_size(add_ids)) { + + if (dtlog_array_get(del_ids, del_i) == + dtlog_array_get(add_ids, add_i)) { + add_i++; del_i++; continue; + + } else if (dtlog_array_get(del_ids, del_i) < + dtlog_array_get(add_ids, add_i)) { + align_add = true; + } else { + align_del = true; + } + } else if (del_i < dtlog_array_size(del_ids)) { + align_add = true; + } else { + align_del = true; + } + + if (align_add) { + dtlog_table_t* org = dtlog_array_get(inp_remove, del_i); + dtlog_table_t* dummy = dtlog_table_init( + NULL, org->table_index, org->num_fields, + 0, eng->m.glb_values); + + dtlog_array_ins(inp_insert, del_i, dummy); + dtlog_array_ins(add_ids, del_i, NULL); + add_i++; del_i++; + } + + if (align_del) { + dtlog_table_t* org = dtlog_array_get(inp_insert, add_i); + dtlog_table_t* dummy = dtlog_table_init( + NULL, org->table_index, org->num_fields, + 0, eng->m.glb_values); + dummy->is_remove = true; + + dtlog_array_ins(inp_remove, add_i, dummy); + dtlog_array_ins(del_ids, add_i, NULL); + add_i++; del_i++; + } + } + + dtlog_array_free(del_ids); + dtlog_array_free(add_ids); +} + +static bool +eng_invoke_external(dtlog_engine_t* eng, dtlog_table_t* input, + dtlog_table_t* del_output, dtlog_table_t* add_output) +{ + /* 'del_output' and 'add_output' have the same table index. */ + if (eng->ext_func == NULL) return false; + return (*eng->ext_func)(eng, input, del_output, add_output); +} + +void +dtlog_eng_set_ext_func(dtlog_engine_t* eng, void* func) +{ + eng->ext_func = func; +} + +int32_t +dtlog_get_table_id(dtlog_engine_t* eng, const char* name, int32_t len) +{ + dtlog_set_t* gv = eng->m.glb_values; + dtlog_value_t* table_name = dtlog_value_init(name, len, gv); + dtlog_map_node_t* node = dtlog_hash_get( + &eng->rule_set.rule_index_map, table_name); + + dtlog_value_free(table_name, eng->m.glb_values); + if (node == NULL) return -1; + return dtlog_p2i(node->value); +} + +dtlog_table_t* +dtlog_get_org_table(dtlog_engine_t* eng, dtlog_table_t* t) +{ + return dtlog_map_get(&eng->tables, dtlog_i2p(t->table_index)); +} + +dtlog_tuple_t* +dtlog_query_on0(dtlog_engine_t* eng, int32_t tid, dtlog_value_t* value) +{ + /* Use DTLOG_INDEX_ALL to iterate on return value. */ + dtlog_tuple_t* qt = dtlog_tuple_init(1); + qt->values[0] = value; + + DTLOG_T(dtlog_ary_t*, int32_t) ints = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_INT32), 0, eng->m.glb_values); + + dtlog_array_add(ints, 0); + dtlog_ints_t* key0 = dtlog_int_tuple_init(ints); + dtlog_array_free(ints); + + dtlog_table_t* orgt = dtlog_map_get(&eng->tables, dtlog_i2p(tid)); + int32_t idx_i = dtlog_table_add_index(orgt, key0); + dtlog_tuple_t* list = dtlog_index_get_index(orgt, qt, idx_i); + + dtlog_tuple_free(qt, eng->m.glb_values, true); + dtlog_int_tuple_free(key0); + return list; +} + +static DTLOG_T(dtlog_ary_t*, dtlog_table_t*) +eng_query_table(dtlog_engine_t* eng, dtlog_table_t* table, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) all) +{ + /* NULL value indicates not to compare that field. */ + + dtlog_set_t* gv = eng->m.glb_values; + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) res = + dtlog_array_init(NULL, 0 /* DTLOG_T_TABLE */, 0, gv); + /* No type presented so freeing dtlog_eng_query will leave tables + * untouched. */ + + int32_t i, j; + dtlog_value_t** val = calloc(table->num_fields, sizeof(void*)); + dtlog_table_t* org = dtlog_map_get(all, dtlog_i2p(table->table_index)); + + DTLOG_SET_ALL(&table->tuples, tval, dtlog_tuple_t*) + DTLOG_T(dtlog_ary_t*, int32_t) param = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + for (i = j = 0;i < table->num_fields;i++) { + /* NULL value only used for query. */ + if (tval->values[i] == NULL) continue; + dtlog_array_add(param, dtlog_i2p(i)); + val[j++] = tval->values[i]; + } + + dtlog_table_t* res_tbl = tblopr_query_table(org, param, val); + dtlog_array_add(res, res_tbl); + dtlog_array_free(param); + DTLOG_SET_END + + free(val); + return res; +} + +DTLOG_T(dtlog_ary_t*, dtlog_table_t*) +dtlog_eng_query(dtlog_engine_t* eng, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) input) +{ + dtlog_ary_t* res = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_TABLE), 0, eng->m.glb_values); + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) res1; + + DTLOG_MAP_ALL(input, node) + res1 = eng_query_table(eng, (dtlog_table_t*)node->value, + &eng->tables); + + DTLOG_ARRAY_ALL(res1, tbl, dtlog_table_t*) + dtlog_array_add(res, tbl); + DTLOG_ARRAY_END + + dtlog_array_free(res1); + DTLOG_MAP_END + return res; +} + +static int32_t +eng_get_table_index(dtlog_rule_t* rule, int32_t param) +{ + return dtlog_array_look_for(&rule->rule, dtlog_i2p(param)); +} + +static int32_t +eng_search_tbl_in_rules(dtlog_table_t* tbl, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) inp_tables) +{ + int32_t idx = 0; + DTLOG_ARRAY_ALL(inp_tables, t, dtlog_table_t*) + if (t->table_index == tbl->table_index) return idx; + idx++; + DTLOG_ARRAY_END + return -1; +} + +static dtlog_table_t* +eng_reset_count(dtlog_table_t* input) +{ + dtlog_table_t* output = dtlog_table_init(NULL, input->table_index, + input->num_fields, input->tuples.size, input->m.glb_values); + output->is_remove = input->is_remove; + + DTLOG_SET_ALL(&input->tuples, t, dtlog_tuple_t*) + dtlog_tuple_t* tup = dtlog_tuple_clone(t); + tup->count = 1; + dtlog_table_add0(output, tup); + DTLOG_SET_END + return output; +} + +static bool +eng_merge_output(dtlog_engine_t* eng, + DTLOG_T(dtlog_ary_t*, int32_t) first_rule, + dtlog_table_t* out_d_del, dtlog_table_t* out_d_add, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) inp_del_tables, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) inp_add_tables) +{ + /* Returns true if input table will be used later. */ + + if (DTLOG_LOG_COMP) { + char buf[8192]; int32_t pos; + pos = 0; pos = dtlog_table_print(buf, pos, out_d_del, false); + buf[pos] = 0; printf("[LOG] merge_out - %s\n", buf); + pos = 0; pos = dtlog_table_print(buf, pos, out_d_add, false); + buf[pos] = 0; printf("[LOG] merge_out + %s\n", buf); + } + + if (dtlog_table_size(out_d_del) == 0 && dtlog_table_size(out_d_add) == 0) + return false; + + int32_t ipos = eng_search_tbl_in_rules(out_d_del, inp_del_tables); + if (ipos >= 0) { + tblopr_merge_delta(out_d_del, + dtlog_array_get(inp_del_tables, ipos), + dtlog_array_get(inp_add_tables, ipos)); + + tblopr_merge_delta(out_d_add, + dtlog_array_get(inp_add_tables, ipos), + dtlog_array_get(inp_del_tables, ipos)); + return false; + } + + int32_t new_inp_no = dtlog_map_has( + &eng->rule_set.output_tables, dtlog_i2p(out_d_del->table_index)) + ? 1000000 : /* just a big number */ + dtlog_array_get_int(dtlog_map_get(&eng->rule_set.table_rule_map, + dtlog_i2p(out_d_del->table_index)), 0); + + dtlog_insert_item(new_inp_no, first_rule, + out_d_del, out_d_add, inp_del_tables, inp_add_tables); + return true; +} + +void +dtlog_eng_do_union(dtlog_engine_t* eng, + dtlog_table_t* input, dtlog_table_t* output) +{ + /* Example: (0, 1, 2) > (2, 1, -, 'const', 0), (0, -, 'aa', 2, 1). */ + + dtlog_rule_t* rule = dtlog_map_get( + &eng->rule_set.rules, dtlog_i2p(output->table_index)); + + tblopr_match_reorder_and_merge(input, output, + dtlog_array_get(&rule->param, + eng_get_table_index(rule, input->table_index)), + dtlog_array_get(&rule->param, + eng_get_table_index(rule, output->table_index)), + &rule->const_param); +} + +void +dtlog_eng_do_join(dtlog_engine_t* eng, + dtlog_table_t* input, dtlog_table_t* output) +{ + /* Example: (7, 6, 2) : (7, 3, -1, -2, 2), (2, 3, -1, -3, 6). + * Initial process: (7, 3, -1, -2, 2) => (7, 3, 2). + * drop 'ignored' param, match constants and merge identical; + * Repeated joins - with conditional having a higher priority than + * full join, possible with constants; the first join could be + * self join. + * Final process: reorder based on left (7, 6, 2). + * Change intermediate tables from hash table to list? + * + * Join is carried out in order of table size, smallest first. + * Table size being -1 indicates it has been joined before. + */ + + dtlog_set_t* gv = eng->m.glb_values; + dtlog_rule_t* rule = dtlog_map_get( + &eng->rule_set.rules, dtlog_i2p(output->table_index)); + + dtlog_ary_t* table_sz = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + dtlog_ary_t* rule_reorder = /* Remove later. */ + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + dtlog_ary_t* natural_order = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + /* STEP 0: sort based on table size and get the first join pair. */ + dtlog_array_add(table_sz, 0); + dtlog_array_add(natural_order, 0); + dtlog_array_add(rule_reorder, dtlog_array_get(&rule->rule, 0)); + + int32_t i; + for (i = 1;i < dtlog_array_size(&rule->rule);i++) { + + int32_t p = dtlog_array_get_int(&rule->rule, i); + dtlog_array_add(table_sz, dtlog_i2p( + dtlog_table_size((dtlog_table_t*)dtlog_map_get(&eng->tables, + dtlog_i2p(p))))); + + dtlog_array_add(natural_order, dtlog_i2p(i)); + dtlog_array_add(rule_reorder, dtlog_i2p(p)); + } + + /* Starting from 1 for skipping left rule. */ + dtlog_sort_array(1, table_sz, rule_reorder, natural_order); + + /* STEP 1: merge, reorder and merge for the input. */ + dtlog_table_t* table1 = NULL; + DTLOG_T(dtlog_ary_t*, int32_t) param1 = NULL; + + int32_t join1_idx = dtlog_array_look_for( + rule_reorder, dtlog_i2p(input->table_index)); + + if (join1_idx < dtlog_array_size(rule_reorder) - 1 && + input->table_index == + dtlog_array_get_int(rule_reorder, join1_idx + 1)) { + + /* Do self join. Only one self join is supported as doing more will + * need full table join and less efficient - use intermediate + * table instead for this case. + * Example: X(b, a, c) : x(a, b, 'v', -, c) x(b, 'w', -, a, c). + */ + + /* -1 for marking as used */ + dtlog_array_set(table_sz, join1_idx, dtlog_i2p(-1)); + dtlog_array_set(table_sz, join1_idx + 1, dtlog_i2p(-1)); + + /* 'pos' is based on rule sequence. */ + int32_t pos1 = dtlog_array_look_for( + &rule->rule, dtlog_i2p(input->table_index)); + int32_t pos2 = dtlog_array_look_for( + &rule->rule, dtlog_i2p(input->table_index)) + 1; + + DTLOG_T(dtlog_ary_t*, int32_t) param1Org = + dtlog_array_get(&rule->param, pos1); + /* (a, b, 'v', -, c) */ + DTLOG_T(dtlog_ary_t*, int32_t) param2Org = + dtlog_array_get(&rule->param, pos2); + /* (b, 'w', -, a, c) */ + + param1 = eng_get_cond_param(param1Org); + /* (a, b, c) */ + DTLOG_T(dtlog_ary_t*, int32_t) param2 = + eng_get_cond_param(param2Org); + /* (b, a, c) */ + + dtlog_table_t* table0 = dtlog_map_get(&eng->tables, + dtlog_i2p(input->table_index)); + + /* original table */ + table1 = dtlog_table_init(NULL, -1, dtlog_array_size(param1), 0, gv); + dtlog_table_t* table2 = dtlog_table_init( + NULL, -1, dtlog_array_size(param1), 0, gv); + + tblopr_match_reorder_and_merge( + input, table1, param1Org, param1, &rule->const_param); + tblopr_match_reorder_and_merge( + input, table2, param2Org, param2, &rule->const_param); + + dtlog_join_param_t* joinp1 = eng_gen_join_param( + param1, param2Org, table_sz, natural_order, rule); + dtlog_join_param_t* joinp2 = eng_gen_join_param( + param2, param1Org, table_sz, natural_order, rule); + + dtlog_table_t* out_tb1 = dtlog_tblopr_join(table1, table0, joinp1); + /* outp: (a, b, c) */ + dtlog_table_t* out_tb2 = dtlog_tblopr_join(table1, input, joinp1); + /* outp: (a, b, c) */ + dtlog_table_t* out_tb3 = dtlog_tblopr_join(table2, table0, joinp2); + /* outp: (b, a, c) */ + + /* Reorder out_tb3. It should only differ in order with out_tb1, 2. */ + dtlog_ary_t* reorder = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + DTLOG_ARRAY_ALL_INT(&joinp1->out_param, pos) + dtlog_array_add(reorder, dtlog_i2p( + dtlog_array_look_for(&joinp2->out_param, dtlog_i2p(pos)))); + DTLOG_ARRAY_END + + dtlog_table_t* out_tb4 = + dtlog_table_init(NULL, -1, out_tb3->num_fields, 0, gv); + tblopr_reorder_table(out_tb3, reorder, NULL, out_tb4); + + /* Merge out_tb1, 2, and 4. */ + dtlog_table_free(table1); + table1 = dtlog_table_init(NULL, -1, out_tb1->num_fields, 0, gv); + + tblopr_merge_table(out_tb1, table1, false); + tblopr_merge_table(out_tb4, table1, false); + tblopr_merge_table(out_tb2, table1, input->is_remove); + + dtlog_array_free(param1); + param1 = dtlog_array_clone(&joinp1->out_param); + + dtlog_table_free(table2); + dtlog_table_free(out_tb1); + dtlog_table_free(out_tb2); + dtlog_table_free(out_tb3); + dtlog_table_free(out_tb4); + + dtlog_array_free(reorder); + dtlog_array_free(param2); + dtlog_join_param_free(joinp1); + dtlog_join_param_free(joinp2); + } else { + int32_t pos1 = dtlog_array_look_for(&rule->rule, + dtlog_i2p(input->table_index)); + DTLOG_T(dtlog_ary_t*, int32_t) param1_org = + dtlog_array_get(&rule->param, pos1); + param1 = eng_get_cond_param(param1_org); + + table1 = dtlog_table_init(NULL, -1, dtlog_array_size(param1), 0, gv); + tblopr_match_reorder_and_merge( + input, table1, param1_org, param1, &rule->const_param); + + /* Use -1 to mark 'used' */ + dtlog_array_set(table_sz, join1_idx, dtlog_i2p(-1)); + } + + /* STEP 2: repeated join, first condition join, then full join. */ + for (;;) { /* Join loop, the iteration param is param1 and table1. */ + + int32_t join2_idx = eng_get_joinable( + rule, param1, table_sz, natural_order); + + if (join2_idx < 0) break; /* Nothing to join. */ + /* Use -1 to mark 'used' */ + dtlog_array_set(table_sz, join2_idx, dtlog_i2p(-1)); + + dtlog_table_t* table2 = dtlog_map_get( + &eng->tables, dtlog_array_get(rule_reorder, join2_idx)); + DTLOG_T(dtlog_ary_t*, int32_t) param2 = dtlog_array_get( + &rule->param, dtlog_array_get_int(natural_order, join2_idx)); + + dtlog_join_param_t* joinp = + eng_gen_join_param(param1, param2, table_sz, natural_order, rule); + dtlog_table_t* table1n = dtlog_tblopr_join(table1, table2, joinp); + + dtlog_table_free(table1); + dtlog_array_free(param1); + + param1 = dtlog_array_clone(&joinp->out_param); + dtlog_join_param_free(joinp); + table1 = table1n; + } + + /* STEP 3: reorder the final table. */ + dtlog_ary_t* final_order = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + DTLOG_ARRAY_ALL_INT((dtlog_ary_t*)dtlog_array_get(&rule->param, 0), p) + if (p < -1) dtlog_array_add(final_order, dtlog_i2p(p)); + else dtlog_array_add( + final_order, + dtlog_i2p(dtlog_array_look_for(param1, dtlog_i2p(p)))); + DTLOG_ARRAY_END + + tblopr_reorder_table(table1, final_order, &rule->const_param, output); + output->is_remove = input->is_remove; + + dtlog_table_free(table1); + dtlog_array_free(param1); + + dtlog_array_free(table_sz); + dtlog_array_free(rule_reorder); + dtlog_array_free(natural_order); + dtlog_array_free(final_order); +} + +/* The basic assumption about external function (and all) is that the result + * of left side does not depend on the order of application of delta change + * of the tables from right side. Since the original input table could be + * obtained in the function, the computation could rely on delta + * (preferable for performance) or whole set, but C=A\B still could not be + * computed because order of delta application is relevant. + * + * The computation order is based on topology sort, and guarantees that each + * rule will only be applied once. E.g., C:A, B; D:C, assume the input + * contains A and B, D will be computed only if both A and B have been + * applied on C. + * + * When input tables are checked (no adding to existing tuple, and no removing + * of none existing tuple, and adding does not overlap with removing), the + * order of adding and removing is not relevant. inp_del_tables is applied + * before inp_add_tables is due to performance consideration. + * (not validated yet) + * + * Tuples in right side table is always regarded as having count 1. + * Tuples in left has actual count. + * + * inp_del_tables and inp_add_tables must be paired, i.e., + * del_tables[i].id == add_tables[i].id holds for all i. + */ + +static void +eng_delta0(dtlog_engine_t* eng, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) inp_del_tables, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) inp_add_tables, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) del_out_tables, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) add_out_tables) +{ + /* Input tables will be destroyed, but still need to free it. */ + dtlog_set_t* gv = eng->m.glb_values; + + dtlog_ary_t* first_rule = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, eng->m.glb_values); + + DTLOG_ARRAY_ALL(inp_del_tables, table, dtlog_table_t*) + /* Item 0 is smallest in each set. */ + int32_t v = dtlog_array_get_int( + dtlog_map_get(&eng->rule_set.table_rule_map, + dtlog_i2p(table->table_index)), 0); + + dtlog_array_add(first_rule, dtlog_i2p(v)); + DTLOG_ARRAY_END + + /* There might be tie in sort. Just pick from the initial order. */ + dtlog_sort_array(0, first_rule, inp_del_tables, inp_add_tables); + + while (dtlog_array_size(inp_del_tables) > 0) { + /* inp_d_tables always has 1 as count but out_d_table will has actual + * count before turning into inp_d_tables. Final output tables will + * have actual count. + */ + + dtlog_array_rmv(first_rule, 0); + /* 'd' stands for delta. */ + dtlog_table_t* inp_d_del_table = dtlog_array_rmv(inp_del_tables, 0); + dtlog_table_t* inp_d_add_table = dtlog_array_rmv(inp_add_tables, 0); + int32_t inp_table_id = inp_d_del_table->table_index; + + if (DTLOG_LOG_COMP) { + char buf[8192]; int32_t pos; + pos = 0; pos = dtlog_table_print(buf, pos, inp_d_del_table, false); + buf[pos] = 0; printf("[LOG] delta - %s\n", buf); + pos = 0; pos = dtlog_table_print(buf, pos, inp_d_add_table, false); + buf[pos] = 0; printf("[LOG] delta + %s\n", buf); + } + + if (dtlog_map_has(&eng->rule_set.output_tables, + dtlog_i2p(inp_table_id))) { + + if (dtlog_table_size(inp_d_del_table) > 0) + dtlog_array_add(del_out_tables, inp_d_del_table); + else dtlog_table_free(inp_d_del_table); + + if (dtlog_table_size(inp_d_add_table) > 0) + dtlog_array_add(add_out_tables, inp_d_add_table); + else dtlog_table_free(inp_d_add_table); + continue; + } + + if (!dtlog_map_has(&eng->rule_set.input_tables, + dtlog_i2p(inp_table_id))) { + + tblopr_gen_delta(inp_d_del_table, + dtlog_map_get(&eng->tables, dtlog_i2p(inp_table_id))); + tblopr_gen_delta(inp_d_add_table, + dtlog_map_get(&eng->tables, dtlog_i2p(inp_table_id))); + } + + if (dtlog_table_size(inp_d_del_table) == 0 && + dtlog_table_size(inp_d_add_table) == 0) { + + dtlog_table_free(inp_d_del_table); + dtlog_table_free(inp_d_add_table); + continue; + } + + dtlog_table_t* inp_d_del_table_c1 = eng_reset_count(inp_d_del_table); + dtlog_table_t* inp_d_add_table_c1 = eng_reset_count(inp_d_add_table); + + DTLOG_T(dtlog_ary_t*, int32_t) rule_order = dtlog_map_get( + &eng->rule_set.table_rule_map, dtlog_i2p(inp_table_id)); + + DTLOG_ARRAY_ALL_INT(rule_order, rule_no) + dtlog_rule_t* rule = dtlog_map_get( + &eng->rule_set.rules, dtlog_i2p(rule_no)); + + dtlog_table_t* out_d_del_table = dtlog_table_init( + 0, rule_no, + dtlog_map_get_int(&eng->rule_set.param_size, + dtlog_i2p(rule_no)), 0, gv); + + dtlog_table_t* out_d_add_table = dtlog_table_init( + 0, rule_no, + dtlog_map_get_int(&eng->rule_set.param_size, + dtlog_i2p(rule_no)), 0, gv); + + out_d_del_table->is_remove = true; + if (eng_invoke_external + (eng, inp_d_del_table_c1, out_d_del_table, out_d_add_table)) { + + bool keep = eng_merge_output(eng, + first_rule, out_d_del_table, + out_d_add_table, inp_del_tables, inp_add_tables); + + if (!keep) { + dtlog_table_free(out_d_del_table); + dtlog_table_free(out_d_add_table); + } + + out_d_del_table = dtlog_table_init( + 0, rule_no, + dtlog_map_get_int(&eng->rule_set.param_size, + dtlog_i2p(rule_no)), 0, gv); + + out_d_add_table = dtlog_table_init( + 0, rule_no, + dtlog_map_get_int(&eng->rule_set.param_size, + dtlog_i2p(rule_no)), 0, gv); + + out_d_del_table->is_remove = true; + eng_invoke_external(eng, + inp_d_add_table_c1, out_d_del_table, out_d_add_table); + /* Will also merge below. */ + } else if (rule->is_union) { + dtlog_eng_do_union(eng, inp_d_del_table_c1, out_d_del_table); + dtlog_eng_do_union(eng, inp_d_add_table_c1, out_d_add_table); + } else { + dtlog_eng_do_join(eng, inp_d_del_table_c1, out_d_del_table); + dtlog_eng_do_join(eng, inp_d_add_table_c1, out_d_add_table); + } + + bool keep = eng_merge_output( + eng, first_rule, out_d_del_table, + out_d_add_table, inp_del_tables, inp_add_tables); + + if (!keep) { + dtlog_table_free(out_d_del_table); + dtlog_table_free(out_d_add_table); + } + DTLOG_ARRAY_END /* For all rules related to one table input. */ + + /* Merge input table. */ + tblopr_final_delta(inp_d_del_table, + dtlog_map_get(&eng->tables, + dtlog_i2p(inp_d_del_table->table_index))); + + tblopr_final_delta(inp_d_add_table, + dtlog_map_get(&eng->tables, + dtlog_i2p(inp_d_add_table->table_index))); + + dtlog_table_free(inp_d_del_table_c1); + dtlog_table_free(inp_d_add_table_c1); + dtlog_table_free(inp_d_del_table); + dtlog_table_free(inp_d_add_table); + } /* For all inputs. */ + + dtlog_array_free(first_rule); +} + +DTLOG_T(dtlog_ary_t*, dtlog_table_t*) +dtlog_eng_delta(dtlog_engine_t* eng, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_remove, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_insert) +{ + dtlog_set_t* gv = eng->m.glb_values; + dtlog_ary_t* del_out = dtlog_array_init( + NULL, DTLOG_KEY(DTLOG_T_TABLE), 0, gv); + + /* No type id presented so that tables will not be freed. */ + dtlog_ary_t* add_out = dtlog_array_init(NULL, 0, 0, gv); + dtlog_ary_t* inp_rmv = dtlog_array_init( + NULL, 0, DTLOG_KEY(DTLOG_T_TABLE), gv); + dtlog_ary_t* inp_ins = dtlog_array_init( + NULL, 0, DTLOG_KEY(DTLOG_T_TABLE), gv); + + DTLOG_MAP_ALL(inp_remove, tn) + dtlog_array_add(inp_rmv, tn->value); + DTLOG_MAP_END + + DTLOG_MAP_ALL(inp_insert, tn) + dtlog_array_add(inp_ins, tn->value); + DTLOG_MAP_END + + eng_align_tables(eng, inp_rmv, inp_ins); + eng_delta0(eng, inp_rmv, inp_ins, del_out, add_out); + + /* Reset state of external function. */ + eng_invoke_external(eng, NULL, NULL, NULL); + DTLOG_ARRAY_ALL(add_out, t, dtlog_table_t*) + dtlog_array_add(del_out, t); + DTLOG_ARRAY_END + + dtlog_array_free(inp_rmv); + dtlog_array_free(inp_ins); + dtlog_array_free(add_out); + inp_remove->m.type = DTLOG_KEY(DTLOG_KTYPE(inp_remove->m.type)); + inp_insert->m.type = DTLOG_KEY(DTLOG_KTYPE(inp_insert->m.type)); + return del_out; +} + +/* -------------------------------------------------------------------------- + * SERIALIZATION + * -------------------------------------------------------------------------- + */ + +/* With respect to table operation, the only computation between fields are + * comparison, so there is no need to maintain schema data. If schema is + * needed for external function or upgrade (e.g., data conversion based + * on versions), schema data, version, and document (e.g., description of + * tables and fields, validations) could be defined by special table. + * Integer could be represented by digits or binary form (e.g., in network + * byte order). + */ + +void +dtlog_io_marshall(const char* val, int32_t sz, dtlog_buf_t* buf) +{ + int32_t i; + dtlog_buf_ensure(buf, sz * 2); /* Max possible length. */ + + for (i = 0;i < sz;i++) { + char c = val[i]; + + if (c == dtlog_config.sep1) { + buf->buf[buf->pos++] = dtlog_config.esc; + buf->buf[buf->pos++] = '1'; + } else if (c == dtlog_config.sep2) { + buf->buf[buf->pos++] = dtlog_config.esc; + buf->buf[buf->pos++] = '2'; + } else if (c == dtlog_config.esc) { + buf->buf[buf->pos++] = dtlog_config.esc; + buf->buf[buf->pos++] = '0'; + } else buf->buf[buf->pos++] = c; + } +} + +int32_t +dtlog_io_unmarshall(char* buf, int32_t sz) +{ + /* 'buf' will be changed and hold result. */ + int32_t i, j; + + for (i = j = 0;j < sz;i++, j++) { + if (buf[j] == dtlog_config.esc) { + switch (buf[++j]) { + case '1': buf[i] = dtlog_config.sep1; break; + case '2': buf[i] = dtlog_config.sep2; break; + case '0': buf[i] = dtlog_config.esc; break; + default: dtlog_assert(false); + } + } else if (j > i) buf[i] = buf[j]; + } + return i; +} + +bool +dtlog_io_decode_0(dtlog_engine_t* eng, const char* buf, int32_t buf_sz, + dtlog_value_t* extra_key, dtlog_value_t* null_str, bool check, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_remove, + DTLOG_T2(dtlog_map_t*, int32_t, dtlog_table_t*) inp_insert) +{ + int32_t i, pos, tbl_idx; + bool is_remove, is_query; + + dtlog_set_t* gv = eng->m.glb_values; + is_query = buf_sz > 0 && buf[0] == '?'; + + for (pos = 0;pos < buf_sz;) { + for (i = 0;buf[pos + i] != dtlog_config.sep2;i++); + if (i < 3 || buf[pos + 1] != dtlog_config.sep1) return false; + + if (!strchr("+-?", buf[pos])) return false; + if (buf[pos] != '?' && is_query) return false; + is_remove = buf[pos] == '-'; + + /* Check if table name is value. */ + tbl_idx = dtlog_get_table_id(eng, buf + pos + 2, i - 2); + if (tbl_idx < 0) return false; + pos += i + 1; + + while (pos < buf_sz && !strchr("+-?", buf[pos])) { + dtlog_tuple_t* tuple = dtlog_tuple_init_str_raw( + buf + pos, &i, extra_key, is_query ? null_str : NULL, gv); + + pos += i; + if (tuple == NULL) return false; + if (check && !is_query) { + eng_check_put_tuple(eng, tuple, is_remove, tbl_idx, + inp_remove, inp_insert); + } else { + eng_put_tuple(eng, tuple, is_remove, tbl_idx, + inp_remove, inp_insert); + } + } + } + return true; +} + +dtlog_buf_t* +dtlog_io_encode(dtlog_engine_t* eng, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) out) +{ + dtlog_buf_t* buf = dtlog_buf_init(NULL); + DTLOG_ARRAY_ALL(out, tbl, dtlog_table_t*) + dtlog_value_t* tbl_name = dtlog_map_get( + &eng->rule_set.rule_name_map, dtlog_i2p(tbl->table_index)); + + dtlog_buf_ensure(buf, 3 + tbl_name->size); + buf->pos += sprintf(buf->buf + buf->pos, "%c%c%s%c", + tbl->is_remove ? '-' : '+', dtlog_config.sep1, + tbl_name->value.a, dtlog_config.sep2); + + DTLOG_SET_ALL(&tbl->tuples, tuple, dtlog_tuple_t*) + dtlog_tuple_print_raw(buf, tuple, 0); + DTLOG_SET_END + DTLOG_ARRAY_END + return buf; +} + +void +dtlog_io_encode_extra(dtlog_engine_t* eng, + DTLOG_T(dtlog_ary_t*, dtlog_table_t*) out, + DTLOG_T2(dtlog_map_t*, dtlog_value_t*, dtlog_buf_t*) buf_all) +{ + DTLOG_ARRAY_ALL(out, tbl, dtlog_table_t*) + dtlog_value_t* tbl_name = dtlog_map_get( + &eng->rule_set.rule_name_map, dtlog_i2p(tbl->table_index)); + + DTLOG_SET_ALL(&tbl->tuples, tuple, dtlog_tuple_t*) + dtlog_value_t* key = tuple->values[0]; + bool print_header = false; + + dtlog_buf_t* buf = dtlog_map_get(buf_all, key); + if (buf == NULL) { + buf = dtlog_buf_init(NULL); + buf->aux = tbl->table_index; + dtlog_map_add(buf_all, key, buf); + print_header = true; + } + + if (buf->aux != tbl->table_index) print_header = true; + if (print_header) { + dtlog_buf_ensure(buf, 3 + tbl_name->size); + buf->pos += sprintf(buf->buf + buf->pos, "%c%c%s%c", + tbl->is_remove ? '-' : '+', dtlog_config.sep1, + tbl_name->value.a, dtlog_config.sep2); + } + dtlog_tuple_print_raw(buf, tuple, 1); + DTLOG_SET_END + DTLOG_ARRAY_END +} + +/* -------------------------------------------------------------------------- + * EXTERNAL API + * -------------------------------------------------------------------------- + */ + +void* +dtlog_init(const char* rules, void* func) +{ + /* Will call exit(1) in case rules are incorrect. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = dtlog_eng_parse(rules, gv); + dtlog_eng_set_ext_func(eng, func); + + eng->io.cur_tuple = NULL; + eng->io.inp_insert = NULL; + eng->io.inp_remove = NULL; + eng->io.cur_tbl_idx = -1; + eng->io.res = NULL; + return eng; +} + +void +dtlog_free(void* e) +{ + dtlog_engine_t* eng = e; + dtlog_set_t* gv = eng->tables.m.glb_values; + dtlog_engine_free(eng); + dtlog_set_free(gv); +} + +bool +dtlog_put_table(void* e, bool is_remove, const char* name) +{ + dtlog_engine_t* eng = e; + dtlog_set_t* gv = eng->tables.m.glb_values; + int32_t tbl_idx = dtlog_get_table_id(eng, name, strlen(name)); + dtlog_assert(eng->io.res == NULL); + + if (tbl_idx < 0 || + !dtlog_hash_get(&eng->rule_set.input_tables, dtlog_i2p(tbl_idx))) + return false; + if (eng->io.cur_tuple) return false; /* Incomplete tuple. */ + + if (eng->io.inp_insert == NULL || eng->io.inp_remove == NULL) { + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + eng->io.inp_remove = dtlog_map_init(NULL, mode, 0, gv); + eng->io.inp_insert = dtlog_map_init(NULL, mode, 0, gv); + } + + eng->io.cur_tbl_is_remove = is_remove; + eng->io.cur_tbl_idx = tbl_idx; + eng->io.cur_tbl_n_fields = dtlog_map_get_int(&eng->rule_set.param_size, + dtlog_i2p(tbl_idx)); + + return true; +} + +void +dtlog_put_value(dtlog_engine_t* eng, dtlog_value_t* value) +{ + /* 'value' is treated as c-str if len is zero. */ + dtlog_assert(eng->io.res == NULL && eng->io.cur_tbl_idx >= 0); + + if (eng->io.cur_tuple == NULL) { + eng->io.cur_tuple = dtlog_tuple_init(eng->io.cur_tbl_n_fields); + eng->io.cur_tuple->count = 1; + eng->io.clm_idx = 0; + } + + eng->io.cur_tuple->values[eng->io.clm_idx++] = value; + if (eng->io.clm_idx >= eng->io.cur_tbl_n_fields) { + dtlog_tuple_set_hash_code(eng->io.cur_tuple, + eng->io.cur_tbl_n_fields); + + eng_check_put_tuple(eng, eng->io.cur_tuple, + eng->io.cur_tbl_is_remove, eng->io.cur_tbl_idx, + eng->io.inp_remove, eng->io.inp_insert); + eng->io.cur_tuple = NULL; + } +} + +void +dtlog_put_field(void* e, void* value, int32_t len) +{ + /* 'value' is treated as c-str if len is zero. */ + dtlog_engine_t* eng = e; + dtlog_set_t* gv = eng->tables.m.glb_values; + dtlog_put_value(eng, value == NULL ? NULL : + dtlog_value_init(value, len, gv)); +} + +void +dtlog_opr(void* e, bool query) +{ + dtlog_engine_t* eng = e; + dtlog_assert(eng->io.res == NULL && + eng->io.inp_insert != NULL && eng->io.inp_remove != NULL); + + if (query) { + /* Must provide one tuple in inp_insert table. */ + dtlog_assert(dtlog_map_size(eng->io.inp_insert) == 1 && + dtlog_map_size(eng->io.inp_remove) == 0); + eng->io.res = dtlog_eng_query(eng, eng->io.inp_insert); + } else { + eng->io.res = dtlog_eng_delta(eng, + eng->io.inp_remove, eng->io.inp_insert); + } + + dtlog_map_free(eng->io.inp_remove); + dtlog_map_free(eng->io.inp_insert); + eng->io.inp_remove = NULL; + eng->io.inp_insert = NULL; + eng->io.cur_tbl_idx = -1; + eng->io.tbl_idx = 0; +} + +bool +dtlog_get_table(void* e, bool* is_remove, const char** name, + int32_t* n_tuples, int32_t* n_fields) +{ + /* Return false if there is no more table. */ + dtlog_engine_t* eng = e; + dtlog_assert(eng->io.inp_remove == NULL && eng->io.inp_insert == NULL); + + if (eng->io.tbl_idx >= dtlog_array_size(eng->io.res)) { + dtlog_array_free(eng->io.res); + eng->io.res = NULL; + eng->io.cur_tuple = NULL; + return false; + } + + eng->io.cur_tbl = dtlog_array_get(eng->io.res, eng->io.tbl_idx++); + *is_remove = eng->io.cur_tbl->is_remove; + *n_tuples = dtlog_table_size(eng->io.cur_tbl); + *n_fields = eng->io.cur_tbl->num_fields; + + dtlog_value_t* val = dtlog_map_get(&eng->rule_set.rule_name_map, + dtlog_i2p(eng->io.cur_tbl->table_index)); + *name = val->value.a; + + eng->io.hash_b = 0; + eng->io.hash_pre = NULL; + eng->io.clm_idx = eng->io.cur_tbl->num_fields; + return true; +} + +bool +dtlog_get_field(void* e, void* res, int32_t* sz) +{ + /* Return false if switches to another table. */ + void** v = (void**)res; + dtlog_engine_t* eng = e; + + dtlog_assert(eng->io.inp_remove == NULL && eng->io.inp_insert == NULL + && eng->io.cur_tbl != NULL); + int32_t n_fields = eng->io.cur_tbl->num_fields; + + if (eng->io.clm_idx < n_fields) { + dtlog_value_t* val = eng->io.cur_tuple->values[eng->io.clm_idx++]; + *v = val->value.a; + *sz = val->size; + return true; + } + + dtlog_map_node_t* nd = dtlog_hash_next(&eng->io.cur_tbl->tuples, false, + &eng->io.hash_b, &eng->io.hash_pre); + if (!nd) return false; + + eng->io.clm_idx = 0; + eng->io.cur_tuple = nd->key; + dtlog_value_t* val = eng->io.cur_tuple->values[eng->io.clm_idx++]; + + *v = val->value.a; + *sz = val->size; + return true; +} + +/* TODO: add stats to value, hash, and table operation. + */ diff --git a/ovn/lib/datalog.h b/ovn/lib/datalog.h new file mode 100644 index 0000000..9726297 --- /dev/null +++ b/ovn/lib/datalog.h @@ -0,0 +1,57 @@ +/* Copyright (c) 2016 VMware, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVN_DATALOG_H +#define OVN_DATALOG_H 1 + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Functions should be called in the following sequence: + * ( init (put_table (put_field)*)* opr (get_table (get_field)*)* free ) + * + * dtlog_init() returns handle of the engine and will be used in all other + * calls. dtlog_put_table() returns false if table name is invalid. + * 'size' could be zero for dtlog_put_field() and null-terminated c-str is + * assumed for this case. dtlog_get_table() returns false if all tables + * have been retrieved. dtlog_get_field() returns false if the last tuple + * of a table has been retrieved. + * + * All values are assumed to be read only for both put and get functions. + * Refer to test-datalog.c for examples. + */ + +void* dtlog_init(const char* rules, void* ext_func); + +bool dtlog_put_table(void* eng, bool is_remove, const char* name); +void dtlog_put_field(void* eng, void* value, int32_t size); + +void dtlog_opr(void* eng, bool is_query /* Query or delta change. */); + +bool dtlog_get_table(void* eng, bool* is_remove, const char** name, + int32_t* n_tuples, int32_t* n_fields); +bool dtlog_get_field(void* eng, void* v, int32_t* size); +void dtlog_free(void* eng); + +#ifdef __cplusplus +} +#endif + +#endif /* ovn_datalog.h */ diff --git a/ovn/lib/ovn-datalog.7.xml b/ovn/lib/ovn-datalog.7.xml new file mode 100644 index 0000000..b70634d --- /dev/null +++ b/ovn/lib/ovn-datalog.7.xml @@ -0,0 +1,493 @@ + + + +

Name

+

ovn-datalog -- Open Virtual Network Datalog

+ +

Synoposis

+
+#include "datalog.h"
+
+ +
+void* dtlog_init(const char* rules, void* ext_func);
+void  dtlog_free(void* d);
+void  dtlog_opr(void* d, bool query);
+
+ +
+bool  dtlog_put_table(void* d, bool is_delete, const char* name);
+void  dtlog_put_field(void* d, void* value, int32_t size);
+
+ +
+bool  dtlog_get_table(void* d, bool* is_delete, const char** name, 
+    int32_t* n_tuples, int32_t* n_fields);
+bool  dtlog_get_field(void* d, void** value, int32_t* size);
+
+ +

Description

+ +

+The dtlog_init() initializes an instance of datalog engine. rules + indicates the datalog program. ext_func, which is optional, will +specify the external function of the engine. +

+ +

+Each instance of engine is defined by its program, its state, and temporary +data for input and output tables. The state consists of content of all +input tables and intermediate tables. +

+ +

+Datalog engine works in pipeline manner. Input table is constructed by first +calling the function dtlog_put_table(), then followed by a serial of function +calls of dtlog_put_field(). Data are streamed field by field, then tuple by +tuple. name indicates the name of table. is_remove +indicates whether the tuples constructed by subsequent calls of +dtlog_put_field() are for addition or for deletion. +

+ +

+To provision multiple tables, the above sequence should be applied on each input +table. dtlog_put_table() could be called any times for each table. Changes +are applied in sequence, e.g., if tuple (1, 2) is first added, then removed +later, those two tuples are ignored. +

+ +

+Output tables are retrieved in similar manner. +

+

+size could be 0 for null-terminated string. When value of a field +is an arbitrary byte array, size must indicate the length of the +array. +

+ +

+Input tables are temporarily buffered in datalog engine instance and will be +cleared once the operation is performed by calling dtlog_opr(). Output tables +are temporarily buffered in datalog engine instance and will be cleared once all +output tables are retrieved. +

+ +

+Value will never be changed or referred to by the engine after calling the +function dtlog_put_field(). Value obtained from output table is a reference +to data inside the engine and should never be changed. The value is valid as +long as some tuple still holds that value. +

+

+The engine implements two operations: +

+ +
    +
  • +Incremental change. The input is a set of tables, each containing a set of +tuples. The output is incremental change of tables caused by the incremental +change of input. +
  • + +
  • +Table query. Each tuple specifies one query condition. For each field of tuple, +NULL indicates the value is not used for matching; non NULL value indicates that +only tuples that match the field value will be retrieved. The output is the set +of tables, each of which is the query result of one query condition. Query +cannot be applied on output table. +
  • +
+ +

+The behavior is undefined if incomplete tuples are provisioned or retrieved. +

+ +

+The usage of external function is illustrated in the NOTES section. +

+ +

+Example: +

+ +
+void* d = dtlog_init("R2(a,b):r2(a,b);R1(a):r1(a).",
+    /* external function not provided */ NULL);
+
+ +
+dtlog_put_table(d, /* adding tuple */ false, "r1");
+dtlog_put_field(d, "r1_a", 0); /* 1st field of 1st tuple for r1 */
+dtlog_put_table(d, false, "r2");
+dtlog_put_field(d, "r2_a", /* c-str */ 0);/* 1st field of 1st tuple */
+dtlog_put_field(d, "r2_b", /* len */ 4); /* 2nd field of 1st tuple */
+
+ +
+dtlog_opr(d, /* change */ false); /* run the engine */
+
+ +
+/* get the 1st table, assume it is R2 */
+dtlog_get_table(d, delete, name, n_tuples, n_fields);
+dtlog_get_field(d, value, sz); /* 1st field of 1st tuple */
+dtlog_get_field(d, value, sz); /* 2nd field of 1st tuple */
+/* get the 2nd table, assume it is R1 */
+dtlog_get_table(d, delete, name, n_tuples, n_fields);
+dtlog_get_field(d, value, sz); /* 1st field of 1st tuple */
+
+ +
+dtlog_free(d);
+
+ +

RETURN VALUE

+ +

+dtlog_init() returns a handle to the engine instance. The handle will be +used in all other calls. If there is error in the data log program, +dtlog_init will call exit(1). +

+ +

+dtlog_put_table() returns false if the specified table name is invalid. +dtlog_get_table() returns false if there is no more table content to retrieve. +dtlog_get_field() returns false if all fields of all tuples of the table +have been retrieved. The last invocation could be skipped since the number +of tuples is known after calling dtlog_get_table(). +

+ +

+The output parameter n_tuples indicates the number of tuples in the +table and n_fields indicates the number of fields for each tuple. +size does not include NULL character for null terminated string. +

+ +

NOTES

+

+The following sections define the syntax, semantics, and run time behavior +of the OVN datalog program. +

+ +

SYNTAX OF OVN DATALOG

+ +
+<token>          ::= [_a-zA-Z][_a-zA-Z0-9]*
+<literal>        ::= '[^']*' 
+
+ +

+Literals are enclosed with single quote and can extend to multiple lines. +Comments start with pound character and end in line end. Blank characters are +ignored. Tokens are case sensitive. +

+ +
+<table name>     ::= <token>
+<field name>     ::= <token>
+
+<field>          ::= <field name> | <literal> | "-"
+<field list>     ::= <field> | <field list> "," <field>
+
+<table>          ::= <table name> "(" <field list> ")"
+<table list>     ::= <table> | <table list> <table>
+
+<join table>     ::= <table> ":" <table list>
+<union table>    ::= <table> ">" <table list>
+
+<rule>           ::= <join table> | <union table>
+<rule list>      ::= <rule> | <rule list> ";" <rule>
+<program>        ::= <rule list> "."
+
+ +

+Example: +

+ +
+        Aa2(a) : a1(a, 'aa', -); 
+        A3(a, 'bb') : Aa2(a) a1(a, -, b) Aa2(b);
+        A4(x, y) > Aa3(x, y) a2(y, x).
+
+ +

+Each tuple is an ordered set of field values. Field name is only used for +specifying matching criteria. All tuples from a table have the same number +of fields. The field "-" is called 'ignored' field. +

+ +

+For each rule, the content of left side table, e.g., Aa2 in the above example, +is determined by the content of all right side tables, e.g., a1 in the above +example. Table could appear in left side at most once. +

+ +

+When a table only appears in the right side of a given rule, it is called +input table. When a table only appears in left side, it is called output +table. When a table appears in both left side and right side, it is called +intermediate table. +

+ +

+Input tables must use all lower case names. Output tables must use all upper +case names. Intermediate tables must use both upper case and lower case in its +name. The right side of any rule could only have one table that appears twice. +Recursion is not supported, as showed below. +

+ +
+
+        Transitive(x,z) : relation(x,y) relation(y,z);
+        Transitive(x,z) : relation(x,y) Transitive(y,z);
+        TRANSITIVE(x,y) : Transitive(x, y).
+
+ +

+OVN datalog program implements two operations. +

+ +
    +
  • +Union. The content of left side table is the union of content of all right +side tables. No literal or 'ignore' field is allowed for rule of union. +
  • + +
  • +

    +Join. The content of left side table is the 'join' of all right side +tables. Joining produces its result in 4 steps illustrated below. +

    +
  • +
+ +
+                                                     Step 1
+Example:              u(a,b)       v(x,y)       u(a,b)  v(x,y) 
+                     +---+---+    +---+---+    +---+---+---+---+
+C(a,c):              | 1 | 2 |    | 2 | 3 |    | 1 | 2 | 2 | 3 | 
+  u(a,b)             +---+---+    +---+---+    +---+---+---+---+        
+  v(b,c).            | 1 | 3 |    | 2 | 4 | => | 1 | 2 | 2 | 4 | 
+                     +---+---+    +---+---+    +---+---+---+---+        
+                                  | 3 | 3 |    | 1 | 2 | 3 | 3 |
+                                  +---+---+    +---+---+---+---+         
+                                               | 1 | 3 | 2 | 3 |         
+ Step 4        Step 3           Step 2         +---+---+---+---+         
+ C(a,c)      u(a) v(y)     u(a,b)  v(x,y)      | 1 | 3 | 2 | 4 |
++---+---+    +---+---+    +---+---+---+---+    +---+---+---+---+
+| 1 | 3 |    | 1 | 3 |    | 1 | 2 | 2 | 3 |    | 1 | 3 | 3 | 3 |         
++---+---+    +---+---+    +---+---+---+---+    +---+---+---+---+         
+| 1 | 4 | <= | 1 | 4 | <= | 1 | 2 | 2 | 4 | <=    u.b == v.x     
++---+---+    +---+---+    +---+---+---+---+    matching criteria
+             | 1 | 3 |    | 1 | 3 | 3 | 3 |
+             +---+---+    +---+---+---+---+
+
+ +
+Step 1: Perform full join operation over all right side tables.
+Step 2: Filter the result tuple set based on matching criteria.
+Step 3: Drop the fields that do no appear in left side table.
+Step 4: Merge duplicate tuples and reorder fields.
+
+ +

+When literal appears in right side table, it indicates to only include +those tuples whose specified field matches the literal. When it appears +in left side table, that value is always provisioned on the specified +field. +

+ +

+'Ignored' field indicates that the specified field is neither used +in matching criteria, nor appearing in the left side table. +'Ignored' field never appears in left side table. +

+ +

+The engine works in incremental computation manner, i.e., the engine only +accepts incremental change for a set of input tables, and it will +compute the incremental change of output tables. The change is in form of +adding tuples or deleting tuples. +

+ +

+Example: assume initially, +

+
+        u(a,b) contains { (1, 2), (1, 3) },
+        v(x,y) contains { (2, 3), (1, 3) }; then
+        C(a,c) contains { (1, 3) }
+
+

+If { (2, 4) } is added to v(x,y), the engine will output { (1, 4) }, making +the current state of C(a,c) as { (1, 3), (1, 4) }. +

+ +

METHOD OF COMPUTATION

+

+The test case test_interactive is an interactive tool that shows the usage +of the engine. +

+ +

+The following steps are performed when the engine is initialized: +

+

+1. Parse the program. +

+

+2. Do topology sort on tables, assuming left side table depend on right side +tables. The sort will fail if there is circular dependency (recursion). +

+

+3. Name the tables and fields by numbers. Table number reflects its topology +order. Rules are rewritten using numbers and saved in rule set. +

+

+4. Create empty table for all input tables and intermediate tables. +

+

+All values of tuple fields used in the engine is kept in a value +dictionary and values will only be accessed through references. +

+

+For input table and intermediate table, indexes will be automatically created +when joining operation is performed. Assume there is rule: +

+
+        A(x, k) : a1(x, y), a1(x, z), a2(y, z, k)
+
+ +

+Index (a1.x) and (a2.y, a2.z) will be created. One table may have multiple +indexes. In this example, a1 is 'self join' since it appears twice on right side. +

+

+Each tuple from left side table has a reference count. It indicates +the number of duplicate tuples that are generated after full join and +criteria matching. (1, 3) from C(a, c) will have count 2 in the above example. +

+

+The following steps are performed for change operation: +

+

+1. Input tables are normalized, i.e., if there is addition of existing tuple +or deletion of non existing tuple, the tuple is ignored. Addition of a tuple +followed by deletion of the same is also ignored. + +

+

+2. The addition table and deletion table are paired, i.e., for the change set, +if there is only addition table +T, an empty table is generated for the same +table as -T, and vice versa. The output is array of table of changes: +({-T1, +T1}, {-T2, +T2}, ..., {-Tn, +Tn}) +

+

+3. Sort the array based on table number. +

+
+4. While the array is not empty:
+4.1 Remove the first item from the array, denote it as -T, +T.
+4.2 Create a copy of -T, +T, but set its tuple reference count to 1.
+4.3 For all rules that has T in right side:
+4.3.1 Check if the rule could be computed by external function.
+4.3.2 If yes, invoke external rule function with -T and +T respectively.
+4.3.3 If not and the rule is union, do union with -T and +T respectively.
+4.3.4 If not and the rule is join, do join with -T and +T respectively.
+4.3.5 Merge the output of above operation to left side table.
+4.3.6 If the last step generates new -T' and +T', insert it to the 
+            change array based on its table number.
+4.4 Merge -T and +T to original table respectively.
+
+

+Merging operation is based on tuple's reference count. For a left side table, +when a tuple is first seen or a tuple's count decreases to 0, that tuple is +added to the new incremental change table. The reference count for right side +table is always regarded as 1. +

+

+Joining is performed as below. Tuple field reordering is carried out as +indicated by the rule. +

+

+1. If the table to be joined involves self join, it is calculated first. The +incremental change is +

+
+        ((dT)X(T)) V ((T)X(dT)) V ((dT)X(dT)) 
+
+

+dT denotes incremental change of T. X denotes join, and V denotes union. +Otherwise, use the table itself as input to step 2. +

+

+2. Choose the table with smallest number of tuples that 'conditionally joins' + with above result. Join the two tables and drop fields that are no longer + needed. Repeat the step until there is no conditional join. Conditionally + join means that the two tables have same field name, so field matching must + be performed first. +

+

+3. Join with all remaining tables. This is called 'full join'. +

+

USE OF EXTERNAL FUNCTION

+

+Join and union operation can never generate new values. When there is a need +to generate new values for a field, external function should be defined. +

+ +

+For example, to generate an unique id for each tuple in a(x, y, z), the +following rule is defined but its implementation is in the external function. +

+
+        B(id, x, y, z) : a(x, y, z) generated_id(id).
+
+

+generated_id is a dummy table to make the rule correct. +

+

+Another example is that given right side table a(x, y), the left side table +B(x, y) is defined as having one tuple for each x from (x, y), such that whose +second field is an array of of all values of y for (x, y) in a(x, y). Assume: +

+
+
+        a is { (1, 1), (1, 2), (2, 3) }, then
+        B is { (1, '1, 2'), (2, '3') }
+
+

+Not all kinds of computation could be implemented with external function. The +restriction is that the incremental change of left side table should never depend on +the order of application of incremental change on right side tables. +

+

+External function is defined as +

+
+        bool (*ext_func)(
+            dtlog_engine_t* d, dtlog_table_t* right, 
+            dtlog_table_t* left_delete, dtlog_table_t* left_add);
+
+

+External function is always invoked before doing join or union. The function +must return false if the given table must be computed by union or join +operation. right denotes the change of some table in right side, either +addition or deletion. left_delete and left_add is empty when +the function is invoked and should be provisioned during invocation. +

+

+External function may keep state for its computation. After dtlog_opr() +finishes, the function is called with: +

+
+        (*ext_func)(d, NULL, NULL, NULL);
+
+

+which could be used to reset its state. +

+

+External function should only be used when union and join do not suffice. +OVN datalog internal APIs must be used to do the computation. +

+
diff --git a/tests/automake.mk b/tests/automake.mk index a9ebf91..9736587 100644 --- a/tests/automake.mk +++ b/tests/automake.mk @@ -90,6 +90,7 @@ TESTSUITE_AT = \ tests/vlog.at \ tests/vtep-ctl.at \ tests/auto-attach.at \ + tests/datalog.at \ tests/ovn.at \ tests/ovn-nbctl.at \ tests/ovn-sbctl.at \ @@ -330,6 +331,7 @@ tests_ovstest_SOURCES = \ tests/test-cmap.c \ tests/test-conntrack.c \ tests/test-csum.c \ + tests/test-datalog.c \ tests/test-flows.c \ tests/test-hash.c \ tests/test-heap.c \ diff --git a/tests/datalog.at b/tests/datalog.at new file mode 100644 index 0000000..c12f308 --- /dev/null +++ b/tests/datalog.at @@ -0,0 +1,11 @@ +AT_BANNER([datalog]) + +AT_SETUP([datalog engine]) +AT_KEYWORDS([ovn]) + +# test-datalog has a set of cases. run it alone to see its output. +AT_CHECK([ovstest test-datalog test], [0], [ignore], [PASS +]) + +AT_CLEANUP + diff --git a/tests/test-datalog.c b/tests/test-datalog.c new file mode 100644 index 0000000..4569f9f --- /dev/null +++ b/tests/test-datalog.c @@ -0,0 +1,1824 @@ +/* Copyright (c) 2016 VMware, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef USE_OUTSIDE_OVS +#include +#include "ovstest.h" +#include "ovn/lib/datalog.h" +#include "ovn/lib/datalog-private.h" +#else +#include "datalog.h" +#include "datalog-private.h" +#endif + +static int32_t dtlog_tst_no_cases_total = 0; +static int32_t dtlog_tst_no_cases_failed = 0; + +static void +t_assert(bool r, const char* s) +{ + dtlog_tst_no_cases_total++; + if (r) return; + dtlog_tst_no_cases_failed++; + printf("FAILED: %s\n",s); +} + +static void +t_assert1(bool r, const char* s) +{ + if (r) return; + printf("FAILED: %s\n",s); +} + +static void +t_sum(void) +{ + printf(" test result %d/%d failed\n", + dtlog_tst_no_cases_failed, dtlog_tst_no_cases_total); +} + +/* -------------------------------------------------------------------------- + * TEST CASES + * -------------------------------------------------------------------------- + */ + +struct test_align_s { + int32_t a; + /*char b; */ + void* c[0]; +}; + +static void +gen_str(char* p, int32_t len) +{ + int32_t i; + for (i = 0;i < len;i++) *p++ = (random() % 52) + 65; + *p++ = 0; +} + +static void +test_collections(void) +{ + char buf[1024]; + { + /* null will be printed. */ + char* p = NULL; + printf("- hello log %s\n", p); + + struct test_align_s s; + printf( + "- size of types\n %p char(%lu) int(%lu) " + "long(%lu) llong(%lu) %d %d\n", + &s, sizeof(char), sizeof(int), sizeof(long), sizeof(long long), + 0 /* (int)((char*)(&s.b) - (char*)(&s.a)) */, + (int)((char*)s.c - (char*)(&s.a))); + + t_assert(sizeof(void*) >= sizeof(int32_t), "holding int in ptr"); + /* Nested variable with same name. */ + int i; for (i = 0;i < 5;i++) { + int i; for (i = 0;i < 5;i++) { + } + } + } + + { /* Check integer could be stored as pointer. */ + void* p1; + void* p2; + + int32_t i; + char* c1 = (char*)&p1; + char* c2 = (char*)&p2; + + for (i = 0;i < sizeof(void*);i++) { + c1[i] = 0x07; c2[i] = 0x70; + } + printf(" int as ptr %p %p\n", p1, p2); + + p1 = 0; p2 = 0; + printf(" int as ptr %p %p\n", p1, p2); + t_assert(p1 == p2, "int as ptr, compare 0 error"); + t_assert(dtlog_p2i(p1) == 0, "int as ptr, int 0 error"); + + p1 = dtlog_i2p(-1); p2 = dtlog_i2p(-1); + printf(" int as ptr %p %p\n", p1, p2); + t_assert(p1 == p2, "int as ptr, compare -1 error"); + t_assert(dtlog_p2i(p2) == -1, "int as ptr, int -1 error"); + printf("- int as ptr\n"); + } + + { /* check bitset */ + dtlog_bits_t* bs = dtlog_bitset_init(NULL); + dtlog_bits_t* bs1 = dtlog_bitset_init(NULL); + + bs->size = 1; + bs->items = realloc(bs->items, bs->size * sizeof(int32_t)); + dtlog_bitset_set(bs, 0); + dtlog_bitset_set(bs, 31); + t_assert(bs->size == 1, "bitmap, size not 1 after set bit 31"); + dtlog_bitset_set(bs, 32); + t_assert(bs->size == 2, "bitmap, size not 2 after set bit 32"); + + t_assert(dtlog_bitset_get( + bs, 0) && dtlog_bitset_get(bs, 31) && dtlog_bitset_get(bs, 32), + "bitmap, not set on bit 0, 31, 32"); + + dtlog_bitset_set(bs1, 64); + dtlog_bitset_set(bs1, 31); + dtlog_bitset_and(bs, bs1); + t_assert(!dtlog_bitset_empty(bs) && !dtlog_bitset_get(bs, 0) && + dtlog_bitset_get(bs, 31) && !dtlog_bitset_get(bs, 32) && + bs->size == 2, "bitmap, AND failed"); + + dtlog_bitset_free(bs); + dtlog_bitset_free(bs1); + printf("- bitmap\n"); + } + + { /* Check array. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + int32_t i; + dtlog_ary_t ary1; + dtlog_ary_t* ary2; + dtlog_array_init(&ary1, 0, 0, gv); + ary2 = dtlog_array_init(NULL, 0, 5, gv); + + for (i = 0;i < 6;i++) { + dtlog_array_add(ary2, dtlog_i2p(i)); + dtlog_array_add(&ary1, dtlog_i2p(i * 10)); + } + + t_assert(ary2->len == 10, "array realloc failed"); + dtlog_array_set(ary2, 2, dtlog_i2p(20)); + t_assert(dtlog_p2i(dtlog_array_get(ary2, 1)) == 1 && + dtlog_p2i(dtlog_array_get(ary2, 5)) == 5 && + dtlog_p2i(dtlog_array_get(ary2, 2)) == 20, + "array get or set failed"); + + dtlog_array_ins(&ary1, 0, dtlog_i2p(1)); + dtlog_array_ins(&ary1, 1, dtlog_i2p(2)); + dtlog_array_ins(&ary1, 8, dtlog_i2p(100)); + int32_t v = dtlog_p2i(dtlog_array_rmv(&ary1, 3)); + t_assert(v == 10, "array insert and remove failed"); + t_assert(dtlog_array_size(&ary1) == 8, "array size failed"); + t_assert(dtlog_array_look_for(&ary1, dtlog_i2p(100)) == 7, + "array look for failed"); + + DTLOG_ARRAY_ALL_INT(&ary1, i) + printf(" %d ", i); + if (i == 40) break; + DTLOG_ARRAY_END + + dtlog_array_free(&ary1); + dtlog_array_free(ary2); + dtlog_set_free(gv); + printf("\n- array\n"); + } + + { /* Check hash code for string and int. Check map. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + int32_t l1 = 0, l2 = 0; + int32_t c1 = dtlog_hash_code_byte("hello", &l1); + int32_t c2 = dtlog_hash_code_byte("world ", &l2); + printf(" hash code len(%d) %x len(%d) %x\n", l1, c1, l2, c2); + t_assert(l1 == 5 && l2 == 6, "hash code len"); + + dtlog_map_t* map1 = dtlog_map_init(NULL, + DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_INT32), 11, gv); + + /* All on same slot. */ + dtlog_map_add(map1, dtlog_i2p(5), dtlog_i2p(6)); + dtlog_map_add(map1, dtlog_i2p(16), dtlog_i2p(17)); + dtlog_map_add(map1, dtlog_i2p(27), dtlog_i2p(28)); + + t_assert(dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(5))) == 6 && + dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(16))) == 17 && + dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(27))) == 28 && + dtlog_map_size(map1) == 3, + "map size, get, or set"); + + t_assert(map1->len == 11, "map int 0"); + int32_t sz = 0; + + sz = dtlog_hash_print(buf, sz, map1, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{27->28,16->17,5->6}") == 0, "map int 1"); + + /* Check link operation. */ + t_assert(dtlog_p2i(dtlog_map_del(map1, dtlog_i2p(16))) == 17, + "map int 2"); + sz = 0; sz = dtlog_hash_print(buf, sz, map1, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{27->28,5->6}") == 0, "map int 3"); + + t_assert(dtlog_p2i(dtlog_map_del(map1, dtlog_i2p(5))) == 6, + "map int 4"); + sz = 0; sz = dtlog_hash_print(buf, sz, map1, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{27->28}") == 0, "map int 5"); + + dtlog_map_add(map1, dtlog_i2p(5), dtlog_i2p(6)); + sz = 0; sz = dtlog_hash_print(buf, sz, map1, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{5->6,27->28}") == 0, "map int 6"); + + t_assert(dtlog_p2i(dtlog_map_del(map1, dtlog_i2p(5))) == 6, + "map int 7"); + sz = 0; sz = dtlog_hash_print(buf, sz, map1, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{27->28}") == 0, "map int 8"); + + t_assert(dtlog_p2i(dtlog_map_del(map1, dtlog_i2p(27))) == 28, + "map int 9"); + sz = 0; sz = dtlog_hash_print(buf, sz, map1, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{}") == 0 && dtlog_map_size(map1) == 0, + "map int 10"); + + /* Break in map. */ + int32_t i = 0; + dtlog_map_add(map1, dtlog_i2p(77), dtlog_i2p(66)); + dtlog_map_add(map1, dtlog_i2p(33), dtlog_i2p(55)); + dtlog_map_add(map1, dtlog_i2p(77), dtlog_i2p(88)); + t_assert(dtlog_map_has(map1, dtlog_i2p(33)), "map int 11a"); + + t_assert(dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(77))) == 88 && + dtlog_map_size(map1) == 2, "map int 11"); + + DTLOG_MAP_ALL(map1, node) + node++; /* Make no warning. */ + i++; + break; + DTLOG_MAP_END + t_assert(i == 1, "map int 12"); + + /* Map rehash. */ + for (i = 0;i < 8192;i++) + dtlog_map_add(map1, dtlog_i2p(i), dtlog_i2p(i * 2)); + + printf(" map len = %d size = %d\n", map1->len, map1->size); + t_assert(dtlog_map_size(map1) == 8192, "map int 13"); + for (i = 0;i < 8192;i++) + t_assert1(dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(i))) == i * 2, + "map set/get 14"); + + for (i = 0;i < 8192;i++) + t_assert1(dtlog_p2i(dtlog_map_del(map1, dtlog_i2p(i))) == i * 2, + "map set/get 15"); + printf(" map len = %d size = %d\n", map1->len, map1->size); + t_assert(dtlog_map_size(map1) == 0, "map int 16"); + + dtlog_map_free(map1); + dtlog_set_free(gv); + printf("- map, int and string\n"); + } + + { /* Check set on string. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_set_t set1; + dtlog_set_init(&set1, DTLOG_KEY(DTLOG_T_STR), 0, gv); + char* app1 = "apple"; + char* app2 = malloc(strlen(app1) + 1); + strcpy(app2, app1); + t_assert(app1 != app2, "two apple string"); + + dtlog_set_add(&set1, app1); + dtlog_set_add(&set1, app2); + dtlog_set_add(&set1, "cherry"); + dtlog_set_add(&set1, "kiwi"); + /* test_hash_dump(&set1); */ + + t_assert(dtlog_set_has(&set1, app1) && dtlog_set_has(&set1, app2) && + dtlog_set_get(&set1, app2) == app1 && + dtlog_set_size(&set1) == 3, "right apple"); + + int32_t i = 0; + DTLOG_SET_ALL(&set1, node, const char*) + node++; /* Make no warning. */ + i++; + break; + DTLOG_SET_END + t_assert(i == 1, "set exit"); + + /* Check set remove. */ + dtlog_set_t* set2 = dtlog_set_init(NULL, + DTLOG_KEY(DTLOG_T_INT32), 11, gv); + t_assert(set2->len == 11, "set int 1"); + + dtlog_set_add(set2, dtlog_i2p(5)); + dtlog_set_add(set2, dtlog_i2p(16)); + dtlog_set_add(set2, dtlog_i2p(27)); + /* test_hash_dump(set2); */ + + int32_t sz = 0; + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{27,16,5}") == 0, "set int 1"); + + DTLOG_SET_ALL_INT(set2, node) + if (node == 16) DTLOG_SET_DEL_ITEM; + DTLOG_SET_END + + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{27,5}") == 0, "set int 3"); + + DTLOG_SET_ALL_INT(set2, node) + if (node == 5) DTLOG_SET_DEL_ITEM; + DTLOG_SET_END + + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{27}") == 0, "set int 5"); + + dtlog_set_add(set2, dtlog_i2p(5)); + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{5,27}") == 0, "set int 6"); + + DTLOG_SET_ALL_INT(set2, node) + if (node == 5) { DTLOG_SET_DEL_ITEM; break; } + DTLOG_SET_END + + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + sz = 0; t_assert(strcmp(buf, "{27}") == 0, "set int 8"); + + DTLOG_SET_ALL_INT(set2, node) + if (node == 27) { DTLOG_SET_DEL_ITEM; break; } + DTLOG_SET_END + + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{}") == 0 && dtlog_set_size(set2) == 0, + "set int 10"); + + dtlog_set_add(set2, 0); + dtlog_set_add(set2, dtlog_i2p(5)); dtlog_set_add(set2, dtlog_i2p(16)); + dtlog_set_add(set2, dtlog_i2p(27)); + dtlog_set_add(set2, dtlog_i2p(6)); dtlog_set_add(set2, dtlog_i2p(17)); + + sz = 0; sz = dtlog_hash_print(buf, sz, set2, true); buf[sz++] = 0; + printf(" set: \n%s", buf); + + i = 0; + DTLOG_SET_ALL_INT(set2, node) + node++; /* Make no warning. */ + if (i >= 2 && i <= 4) DTLOG_SET_DEL_ITEM; + i++; + DTLOG_SET_END + + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + t_assert(strcmp(buf, "{0,27,6}") == 0 && dtlog_set_size(set2) == 3, + "set int 11"); + + DTLOG_SET_ALL_INT(set2, node) + node++; /* Make no warning. */ + DTLOG_SET_DEL_ITEM; + DTLOG_SET_END + + sz = 0; sz = dtlog_hash_print(buf, sz, set2, false); buf[sz++] = 0; + t_assert( + strcmp(buf, "{}") == 0 && dtlog_set_size(set2) == 0, "set int 12"); + + free(app2); + dtlog_set_free(&set1); + dtlog_set_free(set2); + dtlog_set_free(gv); + printf("- set, int and string\n"); + } + { /* Check for hash code collision. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_map_t* map1 = dtlog_map_init(NULL, + DTLOG_KEY(DTLOG_T_TST_INT32), 11, gv); + + dtlog_map_add(map1, dtlog_i2p(205), dtlog_i2p(205)); + dtlog_map_add(map1, dtlog_i2p(5), dtlog_i2p(105)); + dtlog_map_add(map1, dtlog_i2p(16), dtlog_i2p(116)); + dtlog_map_add(map1, dtlog_i2p(206), dtlog_i2p(206)); + + int32_t sz = 0; + sz = dtlog_hash_print(buf, sz, map1, true); buf[sz++] = 0; + printf("%s", buf); + + t_assert(dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(205))) == 205 && + dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(16))) == 116 && + dtlog_p2i(dtlog_map_get(map1, dtlog_i2p(5))) == 105, + "hash, tst int32 1"); + + int32_t i; + for (i = 0;i < 1000;i++) + dtlog_map_add(map1, dtlog_i2p(i), dtlog_i2p((1000 + i))); + + printf(" map len = %d size = %d\n", map1->len, dtlog_map_size(map1)); + t_assert(dtlog_map_size(map1) == 1000, "hash, tst int32 2"); + + for (i = 0;i < 1000;i++) + t_assert1(dtlog_p2i( + dtlog_map_get(map1, dtlog_i2p(i))) == 1000 + i, + "hash, tst int32 3"); + + for (i = 0;i < 1000;i++) + t_assert1(dtlog_p2i( + dtlog_map_del(map1, dtlog_i2p(i))) == 1000 + i, + "hash, tst int32 4"); + + printf(" map len = %d size = %d\n", map1->len, dtlog_map_size(map1)); + t_assert(dtlog_map_size(map1) == 0, "hash, tst int32 5"); + + dtlog_set_free(map1); + dtlog_set_free(gv); + printf("- hash, code collision\n"); + } + { /* Check large set. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + char* buf = malloc(20 * 100000); + int32_t i; + + for (i = 0;i < 100000;i++) + gen_str(buf + i * 20, 19); + + dtlog_set_t* set = dtlog_set_init(NULL, + DTLOG_KEY(DTLOG_T_STR), 0, gv); + + printf(" gen set of strings\n"); + for (i = 0;i < 100000;i++) dtlog_set_add(set, buf + i * 20); + printf(" add set of strings done\n"); + + t_assert(dtlog_set_size(set) == 100000, "large set size"); + printf(" set len = %d size = %d\n", set->len, dtlog_set_size(set)); + + for (i = 0;i < 100000;i++) + t_assert1(dtlog_set_get(set, buf + i * 20) == buf + i * 20, + "large set"); + + for (i = 0;i < 100000;i++) dtlog_set_del(set, buf + i * 20); + t_assert(dtlog_set_size(set) == 0, "large set size 1"); + + free(buf); + dtlog_set_free(set); + dtlog_set_free(gv); + printf("- set, str, large collection\n"); + } +} + +static void +test_tables(void) +{ + char buf[1024]; + { /* Check value collection. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_value_t* nstr = dtlog_value_init("", 0, gv); + dtlog_value_t* nstr1 = dtlog_value_init("", 0, gv); + t_assert(nstr == nstr1, "null string equal"); + + dtlog_value_t* nhello = dtlog_value_init("hello", 0, gv); + char world[7] = "worlda"; + dtlog_value_t* nworld = dtlog_value_init(world, 5, gv); + + char world1[5] = { 'a', 'b', 0, 'c', 'd' }; + dtlog_value_t* nabcd = dtlog_value_init(world1, 5, gv); + + char hello1[5] = { 'h', 'e', 'l', 'l', 'o' }; + dtlog_value_t* nhello1 = dtlog_value_init(hello1, 5, gv); + dtlog_value_t* nab = dtlog_value_init("ab", 0, gv); + + char world2[5] = { 'a', 'b', 0, 'c', 'd' }; + dtlog_value_t* nabcd2 = dtlog_value_init(world2, 5, gv); + + dtlog_value_ref(nstr); + t_assert(dtlog_set_size(gv) == 5, "value set 1"); + + t_assert(nhello == nhello1 && nabcd == nabcd2 && + nabcd != nab && strcmp(nworld->value.a, "world") == 0, + "value set 2"); + + t_assert(nhello->ref_no == 2 && nabcd->ref_no == 2 && + nab->ref_no == 1 && nworld->ref_no == 1 && nstr->ref_no == 3, + "value set 3"); + + dtlog_value_free(nhello, gv); + dtlog_value_free(nhello, gv); + dtlog_value_free(nabcd, gv); + + int32_t sz = 0; + sz = dtlog_hash_print(buf, sz, gv, true); buf[sz++] = 0; + printf("%s", buf); + + t_assert(dtlog_set_size(gv) == 4 && + dtlog_set_has(gv, nabcd), "value set 4"); + + dtlog_set_free(gv); + printf("- value, ref_no\n"); + } + { /* Check tuple. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_tuple_t* tuple1 = dtlog_tuple_init_str("2:f0:f1:f2", gv); + dtlog_tuple_t* tuple2 = dtlog_tuple_init_str("10:f2:f0:f3:f1", gv); + + int32_t sz = 0; + sz = dtlog_tuple_print(buf, sz, tuple1); + buf[sz++] = '|'; + sz = dtlog_tuple_print(buf, sz, tuple2); + buf[sz++] = 0; + printf(" tuple %s %d\n", buf, sz); + t_assert(strcmp("2:f0:f1:f2|10:f2:f0:f3:f1", buf) == 0 && sz == 26, + "tuple str init"); + + sz = 0; sz = dtlog_hash_print(buf, sz, gv, true); buf[sz++] = 0; + printf("%s", buf); + + dtlog_value_t* v_f2 = dtlog_value_init("f2", 0, gv); + t_assert(v_f2->ref_no == 3, "tuple ref"); + dtlog_tuple_free(tuple1, gv, true); + dtlog_tuple_free(tuple2, gv, true); + + sz = 0; sz = dtlog_hash_print(buf, sz, gv, true); buf[sz++] = 0; + printf("%s", buf); + + t_assert(v_f2->ref_no == 1 && dtlog_set_size(gv) == 1, + "tuple ref 1"); + + dtlog_set_free(gv); + printf("- tuple\n"); + } + { /* Check table operation. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_ary_t* key0 = dtlog_array_init(NULL, 0, 0, gv); + dtlog_ary_t* key01 = dtlog_array_init(NULL, 0, 0, gv); + dtlog_array_add(key0, 0); + dtlog_array_add(key01, 0); dtlog_array_add(key01, dtlog_i2p(2)); + + dtlog_ints_t* ikey0 = dtlog_int_tuple_init(key0); + dtlog_ints_t* ikey01 = dtlog_int_tuple_init(key01); + dtlog_array_free(key0); + dtlog_array_free(key01); + + dtlog_value_t* val_a = dtlog_value_init("a", 0, gv); + dtlog_table_t* tbl = dtlog_table_init(NULL, 1, 3, 0, gv); + + dtlog_tuple_t* tkab = dtlog_tuple_init_str("0:a", gv); + dtlog_tuple_t* tkab2 = dtlog_tuple_init_str("0:a:b", gv); + + dtlog_tuple_t* tuple1 = dtlog_tuple_init_str("1:a:c:b", gv); + dtlog_tuple_t* tuple2 = dtlog_tuple_init_str("2:a:e:b", gv); + dtlog_tuple_t* tuple21 = dtlog_tuple_init_str("2:a:e:b", gv); + dtlog_tuple_t* tuple3 = dtlog_tuple_init_str("3:a:f:d", gv); + dtlog_tuple_t* tuple4 = dtlog_tuple_init_str("4:b:g:d", gv); + + /* Create index later. */ + dtlog_table_add(tbl, tuple1); dtlog_table_add(tbl, tuple2); + dtlog_table_add(tbl, tuple3); dtlog_table_add(tbl, tuple4); + dtlog_table_remove(tbl, tuple2); + + t_assert(val_a->ref_no == 6, "table index c0"); + + int32_t sz = 0; + DTLOG_SET_ALL(&tbl->tuples, tuple, dtlog_tuple_t*) + sz = dtlog_tuple_print(buf, sz, tuple); + buf[sz++] = '|'; + DTLOG_SET_END + + buf[sz++] = 0; + printf(" table tuples: %s\n", buf); + t_assert(strcmp(buf, "4:b:g:d|1:a:c:b|3:a:f:d|") == 0, + "table index 0"); + + dtlog_table_add(tbl, tuple21); + int32_t i0 = dtlog_table_add_index(tbl, ikey0); + dtlog_tuple_t* set = dtlog_index_get_index(tbl, tkab, i0); + + sz = 0; + DTLOG_INDEX_ALL(set, i0, t) + sz = dtlog_tuple_print(buf, sz, t); + buf[sz++] = '|'; + DTLOG_INDEX_END + buf[sz++] = 0; + + printf(" search table on index0 (a): %s\n", buf); + t_assert(strcmp(buf, "2:a:e:b|3:a:f:d|1:a:c:b|") == 0, + "table index 1"); + + int32_t i01 = dtlog_table_add_index(tbl, ikey01); + dtlog_int_tuple_free(ikey0); + dtlog_int_tuple_free(ikey01); + + set = dtlog_index_get_index(tbl, tkab2, i01); + sz = 0; sz = dtlog_index_print(buf, sz, tbl); buf[sz++] = 0; + printf("%s", buf); + + sz = 0; + DTLOG_INDEX_ALL(set, i01, t) + sz = dtlog_tuple_print(buf, sz, t); + buf[sz++] = '|'; + DTLOG_INDEX_END + + buf[sz++] = 0; + printf(" search table on index02 (a, b): %s\n", buf); + t_assert(strcmp(buf, "2:a:e:b|1:a:c:b|") == 0, "table index 2"); + + printf(" add tuple when index is defined\n"); + dtlog_tuple_t* tuple5 = dtlog_tuple_init_str("5:c:f:d", gv); + dtlog_tuple_t* tuple6 = dtlog_tuple_init_str("6:b:h:d", gv); + dtlog_tuple_t* tuple7 = dtlog_tuple_init_str("7:a:s:b", gv); + dtlog_table_add(tbl, tuple5); dtlog_table_add(tbl, tuple6); + dtlog_table_add(tbl, tuple7); + + sz = 0; sz = dtlog_index_print(buf, sz, tbl); buf[sz++] = 0; + printf("%s", buf); + + printf(" del tuple when index is defined\n"); + dtlog_table_remove(tbl, tuple1); + dtlog_table_remove(tbl, tuple21); + dtlog_table_remove(tbl, tuple7); + t_assert(val_a->ref_no == 4, "table index c1"); + + sz = 0; sz = dtlog_index_print(buf, sz, tbl); buf[sz++] = 0; + printf("%s", buf); + + sz = 0; + DTLOG_SET_ALL(&tbl->tuples, tuple, dtlog_tuple_t*) + sz = dtlog_tuple_print(buf, sz, tuple); + buf[sz++] = '|'; + DTLOG_SET_END + + buf[sz++] = 0; + printf(" table tuples: %s\n", buf); + t_assert(strcmp(buf, "4:b:g:d|6:b:h:d|3:a:f:d|5:c:f:d|") == 0, + "table index 3"); + + dtlog_table_free(tbl); + dtlog_tuple_free(tkab, gv, true); + dtlog_tuple_free(tkab2, gv, true); + dtlog_value_free(val_a, gv); + + t_assert(dtlog_set_size(gv) == 0, "table values released"); + dtlog_set_free(gv); + printf("- tables and tuples\n"); + } +} + +static void +test_sort(void) +{ + char buf[1024]; + + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_map_t right1, right2, rules; + dtlog_set_init(&right1, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + dtlog_set_init(&right2, DTLOG_KEY(DTLOG_T_VALUE), 0, gv); + dtlog_map_init(&rules, + DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_VALUE(DTLOG_T_SET), 0, gv); + + dtlog_set_add(&right1, dtlog_value_init("1", 0, gv)); + dtlog_set_add(&right2, dtlog_value_init("1", 0, gv)); + dtlog_set_add(&right2, dtlog_value_init("2", 0, gv)); + + dtlog_map_add(&rules, dtlog_value_init("2", 0, gv), &right1); + dtlog_map_add(&rules, dtlog_value_init("3", 0, gv), &right2); + + int32_t sz = 0; + sz = dtlog_hash_print(buf, sz, &rules, false); buf[sz++] = 0; + printf(" G=%s\n", buf); + + dtlog_ary_t order; + dtlog_map_t in_nodes, out_nodes; + dtlog_topo_sort(&rules, &order, &in_nodes, &out_nodes); + + sz = 0; + sz = dtlog_coll_print(buf, sz, &order, DTLOG_T_ARRAY, false); + sz = dtlog_coll_print(buf, sz, &in_nodes, DTLOG_T_SET, false); + sz = dtlog_coll_print(buf, sz, &out_nodes, DTLOG_T_SET, false); + buf[sz++] = 0; + + t_assert(strcmp(buf, "[1,2,3]{1}{3}") == 0, "topo sort 1"); + dtlog_array_free(&order); + dtlog_set_free(&in_nodes); + dtlog_set_free(&out_nodes); + + dtlog_ary_t ary1, ary2, ary3; + dtlog_array_init(&ary1, DTLOG_KEY(DTLOG_T_INT32), 0, NULL); + dtlog_array_init(&ary2, DTLOG_KEY(DTLOG_T_INT32), 0, NULL); + dtlog_array_init(&ary3, DTLOG_KEY(DTLOG_T_INT32), 0, NULL); + + int32_t v1[] = { 9, 7, 9, 7, 5 }; + int32_t v2[] = { 1, 2, 3, 4, 5 }; + int32_t v3[] = { 5, 4, 3, 2, 1 }; + + int32_t i; + for (i = 0;i < 5;i++) { + dtlog_array_add(&ary1, dtlog_i2p(v1[i])); + dtlog_array_add(&ary2, dtlog_i2p(v2[i])); + dtlog_array_add(&ary3, dtlog_i2p(v3[i])); + } + dtlog_sort_array(0, &ary1, &ary2, &ary3); + + sz = 0; + int32_t iv1 = dtlog_insert_item(8, &ary1, dtlog_i2p(6), + dtlog_i2p(10), &ary2, &ary3); + + sz = dtlog_array_print(buf, sz, &ary1, false); + sz = dtlog_array_print(buf, sz, &ary2, false); + sz = dtlog_array_print(buf, sz, &ary3, false); + int32_t iv2 = dtlog_insert_item(8, &ary1, dtlog_i2p(7), + dtlog_i2p(11), &ary2, &ary3); + sz = dtlog_array_print(buf, sz, &ary1, false); + sz = dtlog_array_print(buf, sz, &ary2, false); + sz = dtlog_array_print(buf, sz, &ary3, false); + buf[sz++] = 0; + printf(" sort: %s\n", buf); + + t_assert(iv1 == 3 && iv2 == 4 && strcmp(buf, + "[5,7,7,8,9,9][5,2,4,6,1,3][1,4,2,10,5,3]" + "[5,7,7,8,8,9,9][5,2,4,6,7,1,3][1,4,2,10,11,5,3]") == 0, "sort insert"); + + dtlog_array_free(&ary1); dtlog_array_free(&ary2); dtlog_array_free(&ary3); + dtlog_set_free(gv); + printf("- sort and insert\n"); +} + +static void +test_sync(void) +{ + char buf[1024]; + { + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_sync_init( + "table(a,b) : x(a,'aa')\n" + "# comment \n" + "y(b,-, -) ; z(aparam) > y(x) \n" + ". # last line" + ,gv); + + DTLOG_T2(dtlog_map_t, dtlog_value_t*, dtlog_ary_t*) map; + dtlog_sync_parse(&map); + + int32_t sz = 0; + sz = dtlog_hash_print(buf, sz, &map, false); buf[sz++] = 0; + printf(" sync %s\n", buf); + + t_assert(0 == strcmp(buf, + "{z->[[u,z,t,aparam],[,y,t,x]]," + "table->[[j,table,t,a,t,b],[,x,t,a,s,aa],[,y,t,b,-,,-,]]}"), + "sync check 1"); + dtlog_map_free(&map); + dtlog_set_free(gv); + printf("- log syntax\n"); + } + { + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_sync_init( + /* + "Xx2(a) : x1(a,'aa'); X3(a) : x1(a, b) Xx2(b); _EXTERNAL(x1, func)." + // Input, output cannot be external. + "Xx2(a) : x1(a,'aa'); X3(a) : x1(a, b) Xx2(b); _EXTERNAL(X5, func)." + // Invalid external. + "X2(a) : X1(a,'aa'); X3(a) : X1(a, b, c) X2(b)." + // Upper / lower case wrong. + "X2(a, b) : x2(b) x1(a) x1(a) x1(a)." // Join more than twice. + "X2(a) > x1(a) x1(a) x1(a)." // Union more than twice. + "X2(a, b) > x1(a) x1(a, b)." // Table size mismatch. + "X2(a, b) : x1(a) x2(a, b); X3(v) : x2(z)." // Table size mismatch. + "X2(a, 'aa') > x1(a)." // Constant not allowed in left. + "X2(a, -) : x1(a)." // Ignored not allowed in left. + "X2(a, b) > x1(a, c)." // Union param not found. + "X2(a) : x1(a, c)." // Param not used should be 'ignored'. + "X2(a, z) : x1(a, -)." // Param not defined. + */ + + "Xx2(a) : x1(a,'aa', -); X3(a) : Xx2(a) x1(a, -, b) Xx2(b)." + /* Correct. */ + /* + " Span(ls_id, host_id) : ls(ls_id, vif) vif_place(host_id, vif); \n" + " HOST_LS(host1, ls, host2) : Span(ls, host1) Span(ls, host2). " + */ + , gv); + + DTLOG_T2(dtlog_map_t, dtlog_value_t*, dtlog_ary_t*) map; + dtlog_sync_parse(&map); + + int32_t sz; + sz = 0; sz = dtlog_hash_print(buf, sz, &map, false); buf[sz++] = 0; + printf(" sync %s\n", buf); + + dtlog_rule_set_t rs; + dtlog_rule_set_init(&rs, gv); + dtlog_sem_process(&rs, &map); + sz = 0; sz = dtlog_rule_set_print(buf, sz, &rs); buf[sz++] = 0; + printf(" sem=\n%s\n", buf); + + dtlog_rule_set_free(&rs); + dtlog_map_free(&map); + dtlog_set_free(gv); + printf("- log semantics\n"); + } +} + +static void +test_join(void) +{ + char buf[1024]; + { + /* Do join table 2. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 0, 3, 0, gv); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 4, 0, gv); + + dtlog_table_add(tbl1, dtlog_tuple_init_str("2 :1:2:7", gv)); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1 :1:3:8", gv)); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1 :1:5:9", gv)); + + dtlog_table_add(tbl2, dtlog_tuple_init_str("1 :3:1:2:0", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1 :4:1:2:0", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("9 :4:1:5:0", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1 :4:1:5:1", gv)); + + dtlog_ary_t* ints = + dtlog_array_init(NULL, DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + dtlog_array_add(ints, dtlog_i2p(1)); + dtlog_array_add(ints, dtlog_i2p(2)); + dtlog_array_add(ints, dtlog_i2p(3)); + dtlog_ints_t* key0 = dtlog_int_tuple_init(ints); + dtlog_array_free(ints); + + dtlog_join_param_t jps; + dtlog_join_param_t* jp = dtlog_join_param_init(&jps, key0, gv); + + dtlog_array_add(&jp->select1i, 0); + dtlog_array_add(&jp->select1i, dtlog_i2p(1)); + dtlog_array_add(&jp->select1i, 0); + dtlog_array_add(&jp->select1, NULL); + dtlog_array_add(&jp->select1, NULL); + dtlog_array_add(&jp->select1, dtlog_value_init("0", 0, gv)); + + dtlog_array_add(&jp->rem1, 0); + dtlog_array_add(&jp->rem2, 0); + + dtlog_table_t* tbl3 = dtlog_tblopr_join(tbl1, tbl2, jp); + dtlog_join_param_free(jp); + + int32_t sz = 0; + sz = dtlog_table_print(buf, sz, tbl2, true); buf[sz++] = 0; + printf(" cond join tbl2 index=%s\n", buf); + sz = 0; sz = dtlog_table_print(buf, sz, tbl3, false); buf[sz++] = 0; + printf(" cond join result=%s\n", buf); + t_assert(0 == strcmp(buf, "{2:1:3,3:1:4}"), "cond join 1"); + + dtlog_table_free(tbl1); + dtlog_table_free(tbl2); + dtlog_table_free(tbl3); + dtlog_set_free(gv); + } + { + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 0, 1, 0, gv); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 3, 0, gv); + + dtlog_table_add(tbl1, dtlog_tuple_init_str("2 :1", gv)); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1 :2", gv)); + + dtlog_table_add(tbl2, dtlog_tuple_init_str("8 :3:4:1", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1 :3:5:0", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1 :5:4:1", gv)); + + dtlog_ary_t* ints = dtlog_array_init(NULL, + DTLOG_KEY(DTLOG_T_INT32), 0, gv); + + dtlog_array_add(ints, dtlog_i2p(2)); + dtlog_ints_t* key0 = dtlog_int_tuple_init(ints); + dtlog_array_free(ints); + + dtlog_join_param_t jps; + dtlog_join_param_t* jp = dtlog_join_param_init(&jps, key0, gv); + + dtlog_array_add(&jp->select1i, 0); + dtlog_array_add(&jp->select1, dtlog_value_init("1", 0, gv)); + + dtlog_array_add(&jp->rem1, dtlog_i2p(0)); + dtlog_array_add(&jp->rem2, dtlog_i2p(1)); + + dtlog_table_t* tbl3 = dtlog_tblopr_join(tbl1, tbl2, jp); + dtlog_join_param_free(jp); + + int32_t sz = 0; + sz = dtlog_table_print(buf, sz, tbl2, true); buf[sz++] = 0; + + printf(" join tbl2 index=%s\n", buf); + sz = 0; sz = dtlog_table_print(buf, sz, tbl3, false); buf[sz++] = 0; + printf(" full join result=%s\n", buf); + t_assert(0 == strcmp(buf, "{2:2:4,4:1:4}"), "full join 1"); + + dtlog_table_free(tbl1); + dtlog_table_free(tbl2); + dtlog_table_free(tbl3); + dtlog_set_free(gv); + } + { + /* Do union. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value( gv); + + dtlog_engine_t log; + dtlog_engine_init(&log, gv); + + dtlog_sync_init("X(a0, a2, a1) : x(a2, a1, -, 'cst', a0) .", gv); + DTLOG_T2(dtlog_map_t, dtlog_value_t*, dtlog_ary_t*) map; + + dtlog_sync_parse(&map); + dtlog_sem_process(&log.rule_set, &map); + dtlog_map_free(&map); + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 0, 5, 0, gv); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 3, 0, gv); + + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:a:b:c:cst:d", gv)); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:a:b:c:dst:d", gv)); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:a:b:d:cst:d", gv)); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:a:b:d:cst:e", gv)); + + dtlog_eng_do_union(&log, tbl1, tbl2); + + int32_t sz = 0; + sz = dtlog_table_print(buf, sz, tbl2, false); buf[sz++] = 0; + printf(" union = %s\n", buf); + t_assert(0 == strcmp(buf, "{1:e:a:b,2:d:a:b}"), "union 1"); + + dtlog_table_free(tbl1); dtlog_table_free(tbl2); + dtlog_engine_free(&log); + dtlog_set_free(gv); + } + { /* Do self join 1. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = + dtlog_eng_parse( + "X(b, a, c) : x(a, b, 'v', -, c) x(b, 'w', -, a, c) .", gv); + + dtlog_table_t* tbl0_org = dtlog_map_get(&eng->tables, 0); + dtlog_table_add(tbl0_org, dtlog_tuple_init_str("1:a:b:c:d:e", gv)); + dtlog_table_add(tbl0_org, dtlog_tuple_init_str("1:a:y:v:a:e", gv)); + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 0, 5, 0, gv); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 3, 0, gv); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:w:w:v:w:e", gv)); + + dtlog_eng_do_join(eng, tbl1, tbl2); + int32_t sz = 0; + sz = dtlog_table_print(buf, sz, tbl2, false); buf[sz++] = 0; + printf(" self join simple match = %s\n", buf); + t_assert(0 == strcmp(buf, "{1:w:w:e}"), "self join 1"); + + dtlog_table_free(tbl1); dtlog_table_free(tbl2); + dtlog_engine_free(eng); + dtlog_set_free(gv); + } + { + /* Do self join 2. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = + dtlog_eng_parse("X(a, b, c) : x(a, b) x(a, c) .", gv); + + dtlog_table_t* tbl0_o = dtlog_map_get(&eng->tables, 0); + dtlog_table_t* tbl0 = dtlog_table_init(NULL, 0, 2, 0, gv); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 3, 0, gv); + dtlog_table_add(tbl0, dtlog_tuple_init_str("1:1:1", gv)); + dtlog_table_add(tbl0, dtlog_tuple_init_str("1:1:2", gv)); + dtlog_table_add(tbl0, dtlog_tuple_init_str("1:2:1", gv)); + + dtlog_eng_do_join(eng, tbl0, tbl2); + + int32_t sz = 0; + sz = dtlog_table_print(buf, sz, tbl2, false); buf[sz++] = 0; + printf(" self join simple initial = %s\n", buf); + t_assert(0 == strcmp(buf, + "{1:2:1:1,1:1:1:2,1:1:1:1,1:1:2:2,1:1:2:1}"), + "self join simple init 1"); + + dtlog_table_add(tbl0_o, dtlog_tuple_init_str("1:1:1", gv)); + dtlog_table_add(tbl0_o, dtlog_tuple_init_str("1:1:2", gv)); + dtlog_table_add(tbl0_o, dtlog_tuple_init_str("1:2:1", gv)); + + dtlog_table_t* tbl_0 = dtlog_table_init(NULL, 0, 2, 0, gv); + dtlog_table_t* tbl_2 = dtlog_table_init(NULL, 1, 3, 0, gv); + dtlog_table_add(tbl_0, dtlog_tuple_init_str("1:1:3", gv)); + + dtlog_eng_do_join(eng, tbl_0, tbl_2); + sz = 0; sz = dtlog_table_print(buf, sz, tbl_2, false); buf[sz++] = 0; + printf(" self join simple delta = %s\n", buf); + t_assert(0 == strcmp(buf, + "{1:1:1:3,1:1:3:2,1:1:3:3,1:1:2:3,1:1:3:1}"), + "self join simple delta 1"); + + dtlog_table_free(tbl_0); dtlog_table_free(tbl_2); + dtlog_table_free(tbl0); dtlog_table_free(tbl2); + dtlog_engine_free(eng); + dtlog_set_free(gv); + } + { + /* Do join 3. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = + dtlog_eng_parse( + "R(h, b, c, d) : h(h, p1, p2) p1(p1, -, b) p2(p2, d, c) .",gv); + + dtlog_table_t* tbl0_h = dtlog_map_get(&eng->tables, dtlog_i2p(0)); + dtlog_table_t* tbl0_p1 = dtlog_map_get(&eng->tables, dtlog_i2p(1)); + dtlog_table_t* tbl0_p2 = dtlog_map_get(&eng->tables, dtlog_i2p(2)); + + dtlog_table_add(tbl0_h, dtlog_tuple_init_str("1:h1:px:py", gv)); + dtlog_table_add(tbl0_p1, dtlog_tuple_init_str("1:p1:a1:b", gv)); + dtlog_table_add(tbl0_p1, dtlog_tuple_init_str("1:p1:a2:b", gv)); + dtlog_table_add(tbl0_p2, dtlog_tuple_init_str("1:p2:c:d", gv)); + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 0, 3, 0, gv); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 3, 4, 0, gv); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:h1:p1:p2", gv)); + + dtlog_eng_do_join(eng, tbl1, tbl2); + int sz = 0; sz = dtlog_table_print(buf, sz, tbl2, false); buf[sz++] = 0; + printf(" join 3 simple = %s\n", buf); + t_assert(0 == strcmp(buf, "{2:h1:b:d:c}"), + "join 3 simple"); + + dtlog_table_free(tbl1); dtlog_table_free(tbl2); + dtlog_engine_free(eng); + dtlog_set_free(gv); + } + printf("- join and union\n"); +} + +static bool +test_ext_func(dtlog_engine_t* eng, dtlog_table_t* inp, dtlog_table_t* del_out, + dtlog_table_t* add_out) +{ + char buf[256]; + + if (inp == NULL) return true; /* Reset state. */ + dtlog_value_t* tn = dtlog_map_get( + &eng->rule_set.rule_name_map, dtlog_i2p(del_out->table_index)); + if (strcmp(tn->value.a, "Mm2") != 0) return false; + + int32_t sz = 0; + sz = dtlog_table_print(buf, sz, inp, false); buf[sz] = 0; + printf(" run ext inp = %s\n", buf); + + dtlog_table_t* output = inp->is_remove ? del_out : add_out; + dtlog_value_t* tv[1]; + + DTLOG_SET_ALL(&inp->tuples, tuple, dtlog_tuple_t*) + int sz = sprintf(buf, "aa%scc", tuple->values[0]->value.a); + dtlog_value_t* nv = dtlog_value_init(buf, sz, inp->m.glb_values); + tv[0] = nv; + + dtlog_tuple_t* nt = dtlog_tuple_init_val(tv, 1); + dtlog_table_add(output, nt); + DTLOG_SET_END + + sz = 0; sz = dtlog_table_print(buf, sz, output, false); buf[sz] = 0; + printf(" run ext inp remove = %s\n", inp->is_remove ? "true" : "false"); + printf(" run ext out = %s\n", buf); + return true; +} + +static void +test_delta(void) +{ + char buf[1024]; + { + /* Delta with external function. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = + dtlog_eng_parse("M(a) : inp(a) Mm2(a); Mm2(a) : i(a) .", gv); + /* {0=inp, 1=i, 2=Mm2, 3=M} */ + + dtlog_eng_set_ext_func(eng, test_ext_func); + dtlog_table_t* tbl1 = dtlog_map_get(&eng->tables, 0); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 1, 0, gv); + + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:aaVbbcc", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1:bb", gv)); + + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + dtlog_map_t* inp_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp_insert = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_add(inp_insert, dtlog_i2p(tbl2->table_index), tbl2); + + dtlog_ary_t* res = dtlog_eng_delta(eng, inp_remove, inp_insert); + int32_t sz = 0; + sz = dtlog_array_print(buf, sz, res, false); buf[sz] = 0; + printf(" delta ext empty=%s\n", buf); + t_assert(dtlog_array_size(res) == 0, "delta ext func 0"); + + dtlog_array_free(res); + dtlog_map_free(inp_remove); dtlog_map_free(inp_insert); + dtlog_engine_free(eng); dtlog_set_free(gv); + } + { + /* Delta with external function. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = + dtlog_eng_parse("M(a) : inp(a) Mm2(a); Mm2(a) : i(a) .", gv); + /* {0=inp, 1=i, 2=Mm2, 3=M} */ + + dtlog_eng_set_ext_func(eng, test_ext_func); + dtlog_table_t* tbl1 = dtlog_map_get(&eng->tables, 0); + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 1, 0, gv); + + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:aabbcc", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1:bb", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1:ee", gv)); + + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + dtlog_map_t* inp_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp_insert = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_add(inp_insert, dtlog_i2p(tbl2->table_index), tbl2); + + dtlog_ary_t* res = dtlog_eng_delta(eng, inp_remove, inp_insert); + int32_t sz = 0; + sz = dtlog_array_print(buf, sz, res, false); buf[sz] = 0; + printf(" delta ext=%s\n", buf); + t_assert(0 == strcmp(buf, "[{1:aabbcc}]"), "delta ext 1"); + + sz = 0; + sz = dtlog_hash_print(buf, sz, &eng->tables, false); buf[sz] = 0; + printf(" all tables=%s\n", buf); + + dtlog_table_t* tbl2d = dtlog_table_init(NULL, 1, 1, 0, gv); + tbl2d->is_remove = true; + dtlog_table_add(tbl2d, dtlog_tuple_init_str("1:bb", gv)); + + dtlog_map_t* inp1_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp1_insert = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_add(inp1_remove, dtlog_i2p(tbl2d->table_index), tbl2d); + + dtlog_ary_t* res1 = dtlog_eng_delta(eng, inp1_remove, inp1_insert); + sz = 0; sz = dtlog_array_print(buf, sz, res1, false); buf[sz] = 0; + printf(" delta ext rmv=%s\n", buf); + t_assert(0 == strcmp(buf, "[{1:aabbcc}]") && + ((dtlog_table_t*)dtlog_array_get(res1, 0))->is_remove == true, + "delta ext rmv 1"); + + sz = 0; + sz = dtlog_hash_print(buf, sz, &eng->tables, false); buf[sz] = 0; + + dtlog_array_free(res); + dtlog_map_free(inp_remove); dtlog_map_free(inp_insert); + dtlog_array_free(res1); + dtlog_map_free(inp1_remove); dtlog_map_free(inp1_insert); + + dtlog_engine_free(eng); dtlog_set_free(gv); + printf(" all tables=%s\n", buf); + } + printf("- delta and ext func\n"); +} + +static void +test_delta_misc(void) +{ + char buf[1024]; + { + /* Input's counter should be ignored. */ + int32_t sz; + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = + dtlog_eng_parse( + "Xy(a) : x(a, -); Xy1(a) : Xy(a); Y(a) : Xy1(a) .", gv); + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 0, 2, 0, gv); /* tbl x */ + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:1:1", gv)); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:1:2", gv)); + + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + dtlog_map_t* inp_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp_insert = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_add(inp_insert, dtlog_i2p(tbl1->table_index), tbl1); + + dtlog_ary_t* res = dtlog_eng_delta(eng, inp_remove, inp_insert); + sz = 0; + sz = dtlog_hash_print(buf, sz, &eng->tables, false); buf[sz] = 0; + + printf(" delta, counter %s\n", buf); + t_assert(0 == strcmp(buf, "{0->{1:1:1,1:1:2},1->{2:1},2->{1:1}}"), + "delta, counter value 1"); + + dtlog_array_free(res); + dtlog_map_free(inp_remove); dtlog_map_free(inp_insert); + + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 0, 2, 0, gv); /* tbl x */ + tbl2->is_remove = true; + dtlog_table_add(tbl2, dtlog_tuple_init_str("1:1:1", gv)); + dtlog_table_add(tbl2, dtlog_tuple_init_str("1:1:2", gv)); + + dtlog_map_t* inp1_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp1_insert = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_add(inp1_remove, dtlog_i2p(tbl2->table_index), tbl2); + + dtlog_ary_t* res1 = dtlog_eng_delta(eng, inp1_remove, inp1_insert); + /* No assert issue when minus. */ + sz = 0; sz = dtlog_array_print(buf, sz, res1, false); buf[sz] = 0; + + t_assert(0 == strcmp(buf, "[{1:1}]"), "delta, counter value 2"); + printf(" delta, counter minus%s\n", buf); + + dtlog_array_free(res1); + dtlog_map_free(inp1_remove); dtlog_map_free(inp1_insert); + dtlog_engine_free(eng); dtlog_set_free(gv); + } + { + /* Merge add and remove for normal join. */ + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = + dtlog_eng_parse("A(a) : b(a, b) c(b) .", gv); + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 1, 1, 0, gv); /* tbl c */ + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:1", gv)); + + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + dtlog_map_t* inp_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp_insert = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_add(inp_insert, dtlog_i2p(tbl1->table_index), tbl1); + dtlog_ary_t* res = dtlog_eng_delta(eng, inp_remove, inp_insert); + + /* When testing, should reorder the 2 doJoin in delta(). */ + dtlog_table_t* tbl2 = dtlog_table_init(NULL, 1, 1, 0, gv); /* tbl c */ + dtlog_table_t* tbl3 = dtlog_table_init(NULL, 0, 2, 0, gv); /* tbl b */ + + tbl2->is_remove = true; + dtlog_table_add(tbl2, dtlog_tuple_init_str("1:1", gv)); + dtlog_table_add(tbl3, dtlog_tuple_init_str("1:2:1", gv)); + + dtlog_map_t* inp1_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp1_insert = dtlog_map_init(NULL, mode, 0, gv); + + dtlog_map_add(inp1_remove, dtlog_i2p(tbl2->table_index), tbl2); + dtlog_map_add(inp1_insert, dtlog_i2p(tbl3->table_index), tbl3); + dtlog_ary_t* res1 = dtlog_eng_delta(eng, inp1_remove, inp1_insert); + + int sz = 0; sz = dtlog_hash_print(buf, sz, &eng->tables, false); + buf[sz] = 0; + printf(" merge add and remove %s\n", buf); + t_assert(dtlog_array_size(res1) == 0, "merge add and remove"); + + dtlog_array_free(res); + dtlog_map_free(inp_remove); dtlog_map_free(inp_insert); + dtlog_array_free(res1); + dtlog_map_free(inp1_remove); dtlog_map_free(inp1_insert); + dtlog_engine_free(eng); dtlog_set_free(gv); + } + printf("- delta, misc\n"); +} + +static bool +decode_text_buf(dtlog_engine_t* eng, dtlog_buf_t* buf) +{ + dtlog_set_t* gv = eng->m.glb_values; + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + + eng->io.inp_remove = dtlog_map_init(NULL, mode, 0, gv); + eng->io.inp_insert = dtlog_map_init(NULL, mode, 0, gv); + + dtlog_value_t* null_str = dtlog_value_init("", 0, gv); + bool r = dtlog_io_decode_0(eng, buf->buf, buf->pos, NULL, null_str, + true, eng->io.inp_remove, eng->io.inp_insert); + dtlog_value_free(null_str, gv); + return r; +} + +static bool +decode_text(dtlog_engine_t* eng, const char** lines, int32_t n_lines) +{ + int32_t i; + dtlog_buf_t* buf = dtlog_buf_init(NULL); + + for (i = 0;i < n_lines;i++) { + int sz = strlen(lines[i]); + dtlog_buf_ensure(buf, sz + 1); + + strcpy(buf->buf + buf->pos, lines[i]); + buf->buf[buf->pos + sz] = dtlog_config.sep2; + buf->pos += sz + 1; + } + + bool r = decode_text_buf(eng, buf); + dtlog_buf_free(buf); + return r; +} + +static void +test_io(void) +{ + char buf[1024]; + { + dtlog_config_t sv_cfg = dtlog_config; + dtlog_config.sep1 = ':'; + dtlog_config.sep2 = '\n'; + dtlog_config.esc = '@'; + + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + + char* ts = "234 :@1@2@0Nb:@0@0\n"; + char* ts1 = strdup(ts); + int32_t len = strlen(ts); + + dtlog_set_global_value(gv); + ts1[11] = '\0'; /* 'N' */ + dtlog_tuple_t* t = dtlog_tuple_init_str_raw(ts1, &len, NULL, NULL, gv); + + char* ts2 = t->values[0]->value.a; + char* ts3 = t->values[1]->value.a; + + t_assert(t->count == 234 && + ts2[0] == ':' && ts2[1] == '\n' && ts2[2] == '@' && + ts2[3] == '\0' && ts2[4] == 'b' && t->values[0]->size == 5 && + ts3[0] == '@' && ts3[1] == '@' && t->values[1]->size == 2, + "io decode 1"); + + dtlog_buf_t* buf = dtlog_buf_init(NULL); + dtlog_tuple_print_raw(buf, t, 0); + buf->buf[buf->pos] = '\0'; + + bool nchar = buf->buf[10] == '\0'; + buf->buf[10] = 'N'; + t_assert(nchar && !strcmp(buf->buf, "234:@1@2@0Nb:@0@0\n"), + "io decode 2"); + + dtlog_config = sv_cfg; + dtlog_tuple_free(t, gv, true); + dtlog_buf_free(buf); + dtlog_set_free(gv); + free(ts1); + } + { + const char* d1[] = { "+:r", "1:a:b", "1:x:y" }; + const char* d2[] = { "+:r", "1:a:b", "-:r", "1:x:z", + "+:r", "1:a:c", "-:r", "1:a:c", "+:r", "1:a:c", + "-:r", "1:x:y", "+:r", "1:x:y", "-:r", "1:a:b" }; + + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = dtlog_eng_parse("R(a,b):r(a,b).", gv); + + decode_text(eng, d1, sizeof d1/sizeof(char*)); + dtlog_table_t* tbl = dtlog_map_get(eng->io.inp_insert, dtlog_i2p(0)); + dtlog_assert(dtlog_table_size(tbl) == 2); + + dtlog_ary_t* res = dtlog_eng_delta( + eng, eng->io.inp_remove, eng->io.inp_insert); + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); + + decode_text(eng, d2, sizeof d2/sizeof(char*)); + tbl = dtlog_map_get(eng->io.inp_insert, dtlog_i2p(0)); + dtlog_table_t* tbl1 = dtlog_map_get(eng->io.inp_remove, dtlog_i2p(0)); + dtlog_table_t* tbl2 = dtlog_map_get(eng->io.inp_insert, dtlog_i2p(0)); + + int32_t sz = 0; sz = dtlog_table_print(buf, sz, tbl1, false); + sz = dtlog_table_print(buf, sz, tbl2, false); buf[sz++] = 0; + t_assert(!strcmp("{1:a:b}{1:a:c}", buf), "io, check tuple 1"); + + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_engine_free(eng); + dtlog_set_free(gv); + } + { + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = dtlog_eng_parse("R(a,b):r(a,b).", gv); + + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + eng->io.inp_remove = dtlog_map_init(NULL, mode, 0, gv); + eng->io.inp_insert = dtlog_map_init(NULL, mode, 0, gv); + + dtlog_value_t* key1 = dtlog_value_init("k1", 0, gv); + dtlog_value_t* key2 = dtlog_value_init("k2", 0, gv); + dtlog_buf_t* buf1 = dtlog_buf_init(NULL); dtlog_buf_ensure(buf1, 128); + + strcpy(buf1->buf, "+:r\n1:v1\n1:v2\n"); + dtlog_io_decode_0(eng, buf1->buf, strlen(buf1->buf), key1, + NULL, false, eng->io.inp_remove, eng->io.inp_insert); + strcpy(buf1->buf, "+:r\n1:v3\n"); buf1->pos = strlen(buf1->buf); + dtlog_io_decode_0(eng, buf1->buf, strlen(buf1->buf), key2, + NULL, false, eng->io.inp_remove, eng->io.inp_insert); + + dtlog_ary_t* res = dtlog_eng_delta( + eng, eng->io.inp_remove, eng->io.inp_insert); + dtlog_map_t* buf_all = dtlog_map_init(NULL, + DTLOG_KEY(DTLOG_T_VALUE) | DTLOG_VALUE(DTLOG_T_BUF), 0, gv); + + dtlog_io_encode_extra(eng, res, buf_all); + int32_t sz = 0; + sz = dtlog_coll_print(buf, sz, buf_all, DTLOG_T_MAP, false); + buf[sz++] = 0; printf(" encode extra=%s\n", buf); + + t_assert(!strcmp(buf, "{k2->[+:R\n1:v3\n],k1->[+:R\n1:v1\n1:v2\n]}"), + "io encode extra 1"); + + dtlog_map_free(buf_all); dtlog_buf_free(buf1); + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); + dtlog_value_free(key1, gv); dtlog_value_free(key2, gv); + + dtlog_engine_free(eng); + dtlog_set_free(gv); + } + printf("- io\n"); +} + +static void +test_delta_more(void) +{ + const char* rules = + +"LS_host_set(ls_id, host_id) : logical_switch(ls_id, port_id) " +"port_bind(port_id, vif_id) dest_place(host_id, vif_id); " +"LS_HOST_SET(host_id, ls_id, host_id_item) : LS_host_set(ls_id, host_id) " +"LS_host_set(ls_id, host_id_item); " +"LS_HOST_TUNNEL(host_id, host_id_item, tunnel_ip) : " +"LS_host_set(ls_id, host_id) " +"LS_host_set(ls_id, host_id_item) dest_tunnel(host_id_item,tunnel_ip) . "; + + const char* d1[] = { + "+:logical_switch", "1:ls1:lp1", + "+:port_bind", "1:lp1:vif1", + "+:dest_place", "1:h1:vif1", + "+:dest_tunnel", "1:h1:ip1" + }; + + const char* d2[] = { + "+:port_bind", "1:lp2:vif2", + "+:dest_place", "1:h2:vif2", + "+:logical_switch", "1:ls1:lp2", + "+:dest_tunnel", "1:h2:ip2" + }; + + const char* d3[] = { + "-:dest_place", "1:h2:vif2", + }; + + const char* d4[] = { + "-:logical_switch", "1:ls1:lp1", + }; + + const char* d5[] = { + "-:logical_switch", "1:ls1:lp2", + "+:port_bind", "1:lp1:vif1", "1:lp2:vif2", + "+:dest_place", "1:h1:vif1", + "+:dest_tunnel", "1:h1:ip1", "1:h2:ip2" + }; + + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = dtlog_eng_parse(rules, gv); + + t_assert1(decode_text(eng, d1, sizeof d1/sizeof(char*)), "delta more 0"); + dtlog_ary_t* res = dtlog_eng_delta( + eng, eng->io.inp_remove, eng->io.inp_insert); + dtlog_buf_t* buf = dtlog_io_encode(eng, res); buf->buf[buf->pos] = '\0'; + + t_assert(0 == strcmp(buf->buf, + "+:LS_HOST_TUNNEL\n1:h1:h1:ip1\n+:LS_HOST_SET\n1:h1:ls1:h1\n"), + "delta more 1"); + + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); dtlog_buf_free(buf); + + t_assert1(decode_text(eng, d2, sizeof d2/sizeof(char*)), "delta more 2"); + res = dtlog_eng_delta(eng, eng->io.inp_remove, eng->io.inp_insert); + buf = dtlog_io_encode(eng, res); buf->buf[buf->pos] = '\0'; + + t_assert(0 == strcmp(buf->buf, + "+:LS_HOST_TUNNEL\n1:h2:h1:ip1\n1:h1:h2:ip2\n1:h2:h2:ip2\n" + "+:LS_HOST_SET\n1:h2:ls1:h2\n1:h1:ls1:h2\n1:h2:ls1:h1\n"), + "delta more 3"); + + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); dtlog_buf_free(buf); + + t_assert1(decode_text(eng, d3, sizeof d3/sizeof(char*)), "delta more 4"); + res = dtlog_eng_delta(eng, eng->io.inp_remove, eng->io.inp_insert); + buf = dtlog_io_encode(eng, res); buf->buf[buf->pos] = '\0'; + + t_assert(0 == strcmp(buf->buf, + "-:LS_HOST_TUNNEL\n1:h2:h1:ip1\n1:h1:h2:ip2\n1:h2:h2:ip2\n" + "-:LS_HOST_SET\n1:h2:ls1:h2\n1:h1:ls1:h2\n1:h2:ls1:h1\n" + ), "delta more 5"); + + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); dtlog_buf_free(buf); + + t_assert1(decode_text(eng, d4, sizeof d4/sizeof(char*)), "delta more 5"); + res = dtlog_eng_delta(eng, eng->io.inp_remove, eng->io.inp_insert); + buf = dtlog_io_encode(eng, res); buf->buf[buf->pos] = '\0'; + + t_assert(0 == strcmp(buf->buf, + "-:LS_HOST_TUNNEL\n1:h1:h1:ip1\n-:LS_HOST_SET\n1:h1:ls1:h1\n" + ), "delta more 6"); + + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); dtlog_buf_free(buf); + + t_assert1(decode_text(eng, d5, sizeof d5/sizeof(char*)), "delta more 6"); + res = dtlog_eng_delta(eng, eng->io.inp_remove, eng->io.inp_insert); + buf = dtlog_io_encode(eng, res); buf->buf[buf->pos] = '\0'; + + t_assert(0 == strcmp(buf->buf,""), "delta more 7"); + dtlog_map_free(eng->io.inp_remove); dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); dtlog_buf_free(buf); + + dtlog_engine_free(eng); + dtlog_set_free(gv); + printf("- delta, more\n"); +} + +static int64_t +calc_tm(struct timespec* ts0, struct timespec* ts1) +{ + int64_t t0 = ts0->tv_sec * 1000 * 1000L + ts0->tv_nsec / 1000; + int64_t t1 = ts1->tv_sec * 1000 * 1000L + ts1->tv_nsec / 1000; + return t1 - t0; +} + +static void +test_join_perf(int32_t sz1, int32_t sz2) +{ + /* Correct value for full join is sz1 * sz2 * sz2. + * Correct value for delta join is sz2 * 2 + 1. + */ + struct timespec ts0, ts1; + + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = dtlog_eng_parse( + "TABLE(b, a, c) : table1(a, b) table1(a, c).", gv); + + int32_t mode = DTLOG_KEY(DTLOG_T_INT32) | DTLOG_VALUE(DTLOG_T_TABLE); + dtlog_map_t* inp_remove = dtlog_map_init(NULL, mode, 0, gv); + dtlog_map_t* inp_insert = dtlog_map_init(NULL, mode, 0, gv); + + int32_t i, j; + char buf[1024]; + + dtlog_table_t* tbl1 = dtlog_table_init(NULL, 0, 2, 0, gv); + for (i = 0;i < sz1;i++) { + for (j = 0;j < sz2;j++) { + sprintf(buf, "1:ID_%d:XY_%d", i, j); + dtlog_tuple_t* tuple = dtlog_tuple_init_str(buf, gv); + dtlog_table_add(tbl1, tuple); + } + } + + dtlog_map_add(inp_insert, dtlog_i2p(tbl1->table_index), tbl1); + + clock_gettime(CLOCK_MONOTONIC, &ts0); + dtlog_ary_t* res = dtlog_eng_delta(eng, inp_remove, inp_insert); + clock_gettime(CLOCK_MONOTONIC, &ts1); + dtlog_table_t* res_tbl = dtlog_array_get(res, 0); + + printf(" join sz1=%d sz2=%d\n", sz1, sz2); + printf(" full run = %d buckets = %d\n", + dtlog_table_size(res_tbl), res_tbl->tuples.len); + + printf(" time = %" PRId64 " us\n", calc_tm(&ts0, &ts1)); + dtlog_map_free(inp_remove); + dtlog_map_free(inp_insert); + dtlog_array_free(res); + + inp_remove = dtlog_map_init(NULL, mode, 0, gv); + inp_insert = dtlog_map_init(NULL, mode, 0, gv); + + tbl1 = dtlog_table_init(NULL, 0, 2, 0, gv); + dtlog_table_add(tbl1, dtlog_tuple_init_str("1:ID_0:XY_new_item", gv)); + dtlog_map_add(inp_insert, dtlog_i2p(tbl1->table_index), tbl1); + + clock_gettime(CLOCK_MONOTONIC, &ts0); + res = dtlog_eng_delta(eng, inp_remove, inp_insert); + clock_gettime(CLOCK_MONOTONIC, &ts1); + + res_tbl = dtlog_array_get(res, 0); + printf(" delta run = %d\n", dtlog_table_size(res_tbl)); + printf(" time = %" PRId64 " us\n", calc_tm(&ts0, &ts1)); + dtlog_map_free(inp_remove); + dtlog_map_free(inp_insert); + dtlog_array_free(res); + + dtlog_engine_free(eng); + dtlog_set_free(gv); + printf("- join perf\n"); +} + +static void +read_lines(dtlog_buf_t* buf) +{ + char line[1024]; + for (;;) { + if (scanf("%s", line) != 1) continue; + if (!strcmp(line, "EOF")) break; + int32_t sz = strlen(line); + + dtlog_buf_ensure(buf, sz + 1); + strcpy(buf->buf + buf->pos, line); + buf->buf[buf->pos + sz] = dtlog_config.sep2; + buf->pos += sz + 1; + } + + dtlog_buf_ensure(buf, 1); + buf->buf[buf->pos] = '\0'; +} + +static void +test_interactive(void) +{ + /* Interactive engine for testing purpose. */ + dtlog_buf_t* inp = dtlog_buf_init(NULL); + printf("Input rules, e.g. R(a):r(a).\nUse EOF as end.\n"); + read_lines(inp); + + dtlog_set_t* gv = dtlog_set_init(NULL, 0, 0, NULL); + dtlog_set_global_value(gv); + dtlog_engine_t* eng = dtlog_eng_parse(inp->buf, gv); + + for (;;) { + printf("Input changes, e.g. +:tbl_name|1:f0:f1|EOF\n" + "Use +, -, ? for add, remove, or query.\n" + "Could input multiple tables. \n" + "'|' stands for new line. Use EOF to exit.\n" + "For query, field value could be empty.\n"); + + inp->pos = 0; + read_lines(inp); + if (inp->pos == 0) break; + + if (!decode_text_buf(eng, inp)) printf("Error in format.\n"); + else { + bool is_query = inp->buf[0] == '?'; + dtlog_ary_t* res = is_query ? + dtlog_eng_query(eng, eng->io.inp_insert) : + dtlog_eng_delta(eng, eng->io.inp_remove, eng->io.inp_insert); + + dtlog_buf_t* out = dtlog_io_encode(eng, res); + dtlog_buf_ensure(out, 1); out->buf[out->pos] = '\0'; + printf("Output\n%s\n", out->buf); + + dtlog_map_free(eng->io.inp_remove); + dtlog_map_free(eng->io.inp_insert); + dtlog_array_free(res); + dtlog_buf_free(out); + } + } + + dtlog_buf_free(inp); + dtlog_engine_free(eng); + dtlog_set_free(gv); +} + +static void +test_api(void) +{ + const char* p, *p1, *p2, *p3, *p4, *p5; + int32_t sz, sz1, sz2; + bool rmv, rmv1; + + void* eng = dtlog_init("R2(a,b):r2(a,b); R1(a):r1(a).", + /* Ext func not provided. */NULL); + + rmv = dtlog_put_table(eng, /* adding */false, "r1"); + /* First tuple for r1. */ + dtlog_put_field(eng, "r_1a", 0); + + rmv1 = dtlog_put_table(eng, false, "r2"); + /* First tuple for r2. */ + dtlog_put_field(eng, "r2_1a", /* c-str*/ 0); + dtlog_put_field(eng, "r2_1bx", /* len */5); + /* Second tuple r2. */ + dtlog_put_field(eng, "r2_2a", 0); + dtlog_put_field(eng, "r2_2b", 0); + + dtlog_opr(eng, /* delta change */false); + t_assert(rmv && rmv1, "api 0"); + + /* Get output table R2. */ + rmv1 = dtlog_get_table(eng, &rmv, &p, &sz, &sz1); + + t_assert(0 == strcmp(p, "R2") && sz == 2 && sz1 ==2 && + rmv == false && rmv1 == true, "api 1"); + + /* First tuple. */ + dtlog_get_field(eng, &p1, &sz); + dtlog_get_field(eng, &p2, &sz1); + /* Second tuple. */ + dtlog_get_field(eng, &p3, &sz); + rmv = dtlog_get_field(eng, &p4, &sz); + /* Indicates reaching next table. */ + rmv1 = dtlog_get_field(eng, &p5, &sz); + + t_assert(0 == strcmp(p1, "r2_1a") && 0 == strcmp(p2, "r2_1b") && + 0 == strcmp(p3, "r2_2a") && 0 == strcmp(p4, "r2_2b") + && sz == 5 && sz1 == 5 && rmv == true + && rmv1 == false, "api 2"); + + /* Get output table R1. */ + rmv1 = dtlog_get_table(eng, &rmv, &p, &sz, &sz1); + t_assert(0 == strcmp(p, "R1") && sz == 1 && sz1 == 1 && + rmv == false && rmv1 == true, "api 3"); + + rmv = dtlog_get_field(eng, &p1, &sz); + /* Indicates reaching next table. */ + rmv1 = dtlog_get_field(eng, &p2, &sz); + + t_assert(0 == strcmp(p1, "r_1a") && + sz == 4 && rmv == true && rmv1 == false, "api 4"); + + /* No more table returned. */ + rmv = dtlog_get_table(eng, &rmv, &p, &sz, &sz1); + t_assert(rmv == false, "api 5"); + + dtlog_put_table(eng, false, "r2"); + dtlog_put_field(eng, "r2_1a0", 0); + dtlog_put_field(eng, "r2_1b", 0); + dtlog_opr(eng, false); + dtlog_get_table(eng, &rmv, &p, &sz, &sz1); + /* It is ok to skip tuples of a table. */ + dtlog_get_table(eng, &rmv, &p, &sz, &sz1); + + dtlog_put_table(eng, false, "r2"); + dtlog_put_field(eng, NULL, 0); + dtlog_put_field(eng, "r2_1b", 0); + dtlog_opr(eng, /* query */true); + + rmv = dtlog_get_table(eng, &rmv, &p, &sz2, &sz1); + dtlog_get_field(eng, &p1, &sz); + dtlog_get_field(eng, &p2, &sz); + dtlog_get_field(eng, &p3, &sz); + dtlog_get_field(eng, &p4, &sz); + rmv1 = dtlog_get_table(eng, &rmv, &p, &sz, &sz1); + + t_assert(0 == strcmp(p1, "r2_1a") && sz2 == 2 && + sz1 == 2 && rmv == true && rmv1 == false, "api 5"); + + dtlog_free(eng); + printf("- api\n"); +} + +static void +test_datalog(int argc, char** argv) +{ + if (argc == 2 && !strcmp(argv[1], "test")) { + test_collections(); + test_tables(); + test_sort(); + test_sync(); + test_join(); + test_io(); + test_delta(); + test_delta_misc(); + test_delta_more(); + test_join_perf(100, 100); + test_api(); + + t_sum(); + fprintf(stderr, "%s\n", /* for at script */ + dtlog_tst_no_cases_failed == 0 ? "PASS" : "FAIL"); + } + else if (argc == 2 && !strcmp(argv[1], "run")) + test_interactive(); + else printf("usage: test-datalog test|run\nrun for interactive mode"); +} + +#ifndef USE_OUTSIDE_OVS +OVSTEST_REGISTER("test-datalog", test_datalog); +#else +int main(int argc, char** argv) { test_datalog(argc, argv); return 0; } +#endif + diff --git a/tests/testsuite.at b/tests/testsuite.at index f5f1253..abf37cf 100644 --- a/tests/testsuite.at +++ b/tests/testsuite.at @@ -76,3 +76,4 @@ m4_include([tests/ovn-nbctl.at]) m4_include([tests/ovn-sbctl.at]) m4_include([tests/ovn-controller.at]) m4_include([tests/ovn-controller-vtep.at]) +m4_include([tests/datalog.at])